diff --git a/rust/cubestore/src/cluster/message.rs b/rust/cubestore/src/cluster/message.rs index b0cb210603992..c931972b89ae5 100644 --- a/rust/cubestore/src/cluster/message.rs +++ b/rust/cubestore/src/cluster/message.rs @@ -13,6 +13,8 @@ pub enum NetworkMessage { WarmupDownload(/*remote_path*/ String), WarmupDownloadResult(Result<(), CubeError>), + WarmupCleanup(/*remote_path*/ String), + WarmupCleanupResult(Result<(), CubeError>), MetaStoreCall(MetaStoreRpcMethodCall), MetaStoreCallResult(MetaStoreRpcMethodResult), diff --git a/rust/cubestore/src/cluster/mod.rs b/rust/cubestore/src/cluster/mod.rs index c3a276d5b0c36..23162ce9752c8 100644 --- a/rust/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/src/cluster/mod.rs @@ -60,6 +60,7 @@ pub trait Cluster: Send + Sync { fn server_name(&self) -> &str; async fn warmup_download(&self, node_name: &str, remote_path: String) -> Result<(), CubeError>; + async fn warmup_cleanup(&self, node_name: &str, remote_path: String) -> Result<(), CubeError>; fn job_result_listener(&self) -> JobResultListener; @@ -182,6 +183,17 @@ impl Cluster for ClusterImpl { } } + async fn warmup_cleanup(&self, node_name: &str, remote_path: String) -> Result<(), CubeError> { + // We only wait for the result is to ensure our request is delivered. + let response = self + .send_or_process_locally(node_name, NetworkMessage::WarmupCleanup(remote_path)) + .await?; + match response { + NetworkMessage::WarmupCleanupResult(r) => r, + _ => panic!("unexpected result for warmup cleanup"), + } + } + fn job_result_listener(&self) -> JobResultListener { JobResultListener { receiver: self.event_sender.subscribe(), @@ -578,7 +590,13 @@ impl ClusterImpl { let res = self.remote_fs.download_file(&remote_path).await; NetworkMessage::WarmupDownloadResult(res.map(|_| ())) } - NetworkMessage::SelectResult(_) | NetworkMessage::WarmupDownloadResult(_) => { + NetworkMessage::WarmupCleanup(remote_path) => { + let res = self.remote_fs.delete_local_copy(&remote_path).await; + NetworkMessage::WarmupCleanupResult(res.map(|_| ())) + } + NetworkMessage::SelectResult(_) + | NetworkMessage::WarmupDownloadResult(_) + | NetworkMessage::WarmupCleanupResult(_) => { panic!("result sent to worker"); } NetworkMessage::MetaStoreCall(_) | NetworkMessage::MetaStoreCallResult(_) => { diff --git a/rust/cubestore/src/remotefs/gcs.rs b/rust/cubestore/src/remotefs/gcs.rs index 87b4097a0d971..1f4ca6ec2ce1e 100644 --- a/rust/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/src/remotefs/gcs.rs @@ -102,15 +102,21 @@ impl RemoteFs for GCSRemoteFs { Object::delete(self.bucket.as_str(), self.gcs_path(remote_path).as_str()).await?; info!("Deleting {} ({:?})", remote_path, time.elapsed()?); + self.delete_local_copy(remote_path).await + } + + async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> { let _guard = self.delete_mut.lock().await; let local = self.dir.as_path().join(remote_path); - if fs::metadata(local.clone()).await.is_ok() { - fs::remove_file(local.clone()).await?; - LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()) - .await?; + if let Err(e) = fs::remove_file(local.clone()).await { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(()); + } else { + return Err(e)?; + } } - - Ok(()) + // We have removed a file, cleanup. + LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await } async fn list(&self, remote_prefix: &str) -> Result, CubeError> { diff --git a/rust/cubestore/src/remotefs/mod.rs b/rust/cubestore/src/remotefs/mod.rs index ed8a5041ab5a7..72b4950c643bf 100644 --- a/rust/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/src/remotefs/mod.rs @@ -39,6 +39,10 @@ pub trait RemoteFs: Send + Sync + Debug { async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError>; + /// Deletes the local copy of `remote_path`, if the latter exists. + /// If no file was downloaded, nothing happens. + async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError>; + async fn list(&self, remote_prefix: &str) -> Result, CubeError>; async fn list_with_metadata(&self, remote_prefix: &str) -> Result, CubeError>; @@ -114,15 +118,21 @@ impl RemoteFs for LocalDirRemoteFs { } } - let _local_guard = self.dir_delete_mut.lock().await; + self.delete_local_copy(remote_path).await + } + + async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> { + let _guard = self.dir_delete_mut.lock().await; let local = self.dir.as_path().join(remote_path); - if fs::metadata(local.clone()).await.is_ok() { - fs::remove_file(local.clone()).await?; - LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()) - .await?; + if let Err(e) = fs::remove_file(local.clone()).await { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(()); + } else { + return Err(e)?; + } } - - Ok(()) + // We have removed a file, cleanup. + LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await } async fn list(&self, remote_prefix: &str) -> Result, CubeError> { diff --git a/rust/cubestore/src/remotefs/queue.rs b/rust/cubestore/src/remotefs/queue.rs index 1d4e9f580ff2c..9ce2f8b11cec7 100644 --- a/rust/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/src/remotefs/queue.rs @@ -203,6 +203,10 @@ impl RemoteFs for QueueRemoteFs { } } + async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> { + self.remote_fs.delete_local_copy(remote_path).await + } + async fn list(&self, remote_prefix: &str) -> Result, CubeError> { self.remote_fs.list(remote_prefix).await } diff --git a/rust/cubestore/src/remotefs/s3.rs b/rust/cubestore/src/remotefs/s3.rs index b611a938b08ec..9b80290e232c9 100644 --- a/rust/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/src/remotefs/s3.rs @@ -119,15 +119,21 @@ impl RemoteFs for S3RemoteFs { ))); } + self.delete_local_copy(remote_path).await + } + + async fn delete_local_copy(&self, remote_path: &str) -> Result<(), CubeError> { let _guard = self.delete_mut.lock().await; let local = self.dir.as_path().join(remote_path); - if fs::metadata(local.clone()).await.is_ok() { - fs::remove_file(local.clone()).await?; - LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()) - .await?; + if let Err(e) = fs::remove_file(local.clone()).await { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(()); + } else { + return Err(e)?; + } } - - Ok(()) + // We have removed a file, cleanup. + LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()).await } async fn list(&self, remote_prefix: &str) -> Result, CubeError> { diff --git a/rust/cubestore/src/scheduler/mod.rs b/rust/cubestore/src/scheduler/mod.rs index a244beb2aea87..a495bc337e0ba 100644 --- a/rust/cubestore/src/scheduler/mod.rs +++ b/rust/cubestore/src/scheduler/mod.rs @@ -147,6 +147,8 @@ impl SchedulerImpl { if partition.get_row().is_active() { if let Some(file_name) = partition.get_row().get_full_name(partition.get_id()) { self.remote_fs.delete_file(file_name.as_str()).await?; + self.schedule_partition_cleanup(partition.get_id(), file_name) + .await?; } } } @@ -162,6 +164,8 @@ impl SchedulerImpl { { if let Some(file_name) = partition.get_row().get_full_name(partition.get_id()) { self.remote_fs.delete_file(file_name.as_str()).await?; + self.schedule_partition_cleanup(partition.get_id(), file_name) + .await?; } } } @@ -275,4 +279,16 @@ impl SchedulerImpl { .await?; self.cluster.warmup_download(&node_name, path).await } + + async fn schedule_partition_cleanup( + &self, + partition_id: u64, + path: String, + ) -> Result<(), CubeError> { + let node_name = self + .cluster + .node_name_by_partitions(&[partition_id]) + .await?; + self.cluster.warmup_cleanup(&node_name, path).await + } }