diff --git a/rust/.gitignore b/rust/.gitignore index e0df730b3c00b..2f35ae7a21f86 100644 --- a/rust/.gitignore +++ b/rust/.gitignore @@ -6,3 +6,5 @@ upstream dist node_modules downloaded +cubestore/target +cubesql/target \ No newline at end of file diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index e8a097fcbe5fa..c9609657f2f03 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -16,6 +16,7 @@ use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService}; use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl}; use crate::queryplanner::{QueryPlanner, QueryPlannerImpl}; use crate::remotefs::gcs::GCSRemoteFs; +use crate::remotefs::minio::MINIORemoteFs; use crate::remotefs::queue::QueueRemoteFs; use crate::remotefs::s3::S3RemoteFs; use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; @@ -231,6 +232,7 @@ pub fn validate_config(c: &dyn ConfigObj) -> ValidationMessages { } let mut remote_vars = vec![ + "CUBESTORE_MINIO_BUCKET", "CUBESTORE_S3_BUCKET", "CUBESTORE_GCS_BUCKET", "CUBESTORE_REMOTE_DIR", @@ -262,6 +264,10 @@ pub enum FileStoreProvider { bucket_name: String, sub_path: Option, }, + MINIO { + bucket_name: String, + sub_path: Option, + }, } #[derive(Clone)] @@ -540,6 +546,11 @@ impl Config { ), sub_path: env::var("CUBESTORE_S3_SUB_PATH").ok(), } + } else if let Ok(bucket_name) = env::var("CUBESTORE_MINIO_BUCKET") { + FileStoreProvider::MINIO { + bucket_name, + sub_path: env::var("CUBESTORE_MINIO_SUB_PATH").ok(), + } } else if let Ok(bucket_name) = env::var("CUBESTORE_GCS_BUCKET") { FileStoreProvider::GCS { bucket_name, @@ -795,6 +806,21 @@ impl Config { }) .await; } + FileStoreProvider::MINIO { + bucket_name, + sub_path, + } => { + let data_dir = self.config_obj.data_dir.clone(); + let bucket_name = bucket_name.to_string(); + let sub_path = sub_path.clone(); + self.injector + .register("original_remote_fs", async move |_| { + let arc: Arc = + MINIORemoteFs::new(data_dir, bucket_name, sub_path).unwrap(); + arc + }) + .await; + } FileStoreProvider::Local => unimplemented!(), // TODO }; } diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs new file mode 100644 index 0000000000000..01c75cc401b75 --- /dev/null +++ b/rust/cubestore/src/remotefs/minio.rs @@ -0,0 +1,302 @@ +use crate::di_service; +use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs}; +use crate::util::lock::acquire_lock; +use crate::CubeError; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::cube_ext; +use log::{debug, info}; +use regex::{NoExpand, Regex}; +use s3::creds::Credentials; +use s3::{Bucket, Region}; +use std::env; +use std::fmt; +use std::fmt::Formatter; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tempfile::NamedTempFile; +use tokio::fs; +use tokio::sync::Mutex; + +pub struct MINIORemoteFs { + dir: PathBuf, + bucket: std::sync::RwLock, + sub_path: Option, + delete_mut: Mutex<()>, +} + +//TODO Not if this needs any changes +impl fmt::Debug for MINIORemoteFs { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let mut s = f.debug_struct("MINIORemoteFs"); + s.field("dir", &self.dir).field("sub_path", &self.sub_path); + // Do not expose MINIO (secret) credentials. + match self.bucket.try_read() { + Ok(bucket) => s + .field("bucket_name", &bucket.name) + .field("bucket_region", &bucket.region), + Err(_) => s.field("bucket", &""), + }; + s.finish_non_exhaustive() + } +} + +impl MINIORemoteFs { + pub fn new( + dir: PathBuf, + bucket_name: String, + sub_path: Option, + ) -> Result, CubeError> { + let key_id = env::var("CUBESTORE_MINIO_ACCESS_KEY_ID").ok(); + let access_key = env::var("CUBESTORE_MINIO_SECRET_ACCESS_KEY").ok(); + let minio_server_endpoint = env::var("CUBESTORE_MINIO_SERVER_ENDPOINT").ok(); + + let credentials = + Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; + let region = Region::Custom { + region: "".to_owned(), + endpoint: minio_server_endpoint + .as_deref() + .unwrap_or("localhost:") + .to_string(), + }; + let bucket = std::sync::RwLock::new(Bucket::new_with_path_style( + &bucket_name, + region.clone(), + credentials, + )?); + let fs = Arc::new(Self { + dir, + bucket, + sub_path, + delete_mut: Mutex::new(()), + }); + spawn_creds_refresh_loop(key_id, access_key, bucket_name, region, &fs); + Ok(fs) + } +} + +fn spawn_creds_refresh_loop( + key_id: Option, + access_key: Option, + bucket_name: String, + region: Region, + fs: &Arc, +) { + // Refresh credentials. TODO: use expiration time. + let refresh_every = refresh_interval_from_env(); + if refresh_every.as_secs() == 0 { + return; + } + let fs = Arc::downgrade(fs); + std::thread::spawn(move || { + log::debug!("Started MINIO credentials refresh loop"); + loop { + std::thread::sleep(refresh_every); + let fs = match fs.upgrade() { + None => { + log::debug!("Stopping MINIO credentials refresh loop"); + return; + } + Some(fs) => fs, + }; + let c = match Credentials::new( + key_id.as_deref(), + access_key.as_deref(), + None, + None, + None, + ) { + Ok(c) => c, + Err(e) => { + log::error!("Failed to refresh minIO credentials: {}", e); + continue; + } + }; + let b = match Bucket::new_with_path_style(&bucket_name, region.clone(), c) { + Ok(b) => b, + Err(e) => { + log::error!("Failed to refresh minIO credentials: {}", e); + continue; + } + }; + *fs.bucket.write().unwrap() = b; + log::debug!("Successfully refreshed minIO credentials") + } + }); +} + +fn refresh_interval_from_env() -> Duration { + let mut mins = 180; // 3 hours by default. + if let Ok(s) = std::env::var("CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS") { + match s.parse::() { + Ok(i) => mins = i, + Err(e) => log::error!("Could not parse CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS. Refreshing every {} minutes. Error: {}", mins, e), + }; + }; + Duration::from_secs(60 * mins) +} + +di_service!(MINIORemoteFs, [RemoteFs]); + +#[async_trait] +impl RemoteFs for MINIORemoteFs { + async fn upload_file( + &self, + temp_upload_path: &str, + remote_path: &str, + ) -> Result<(), CubeError> { + let time = SystemTime::now(); + debug!("Uploading {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let temp_upload_path_copy = temp_upload_path.to_string(); + let status_code = cube_ext::spawn_blocking(move || { + bucket.put_object_stream_blocking(temp_upload_path_copy, path) + }) + .await??; + let local_path = self.dir.as_path().join(remote_path); + if Path::new(temp_upload_path) != local_path { + fs::create_dir_all(local_path.parent().unwrap()) + .await + .map_err(|e| { + CubeError::internal(format!( + "Create dir {}: {}", + local_path.parent().as_ref().unwrap().to_string_lossy(), + e + )) + })?; + fs::rename(&temp_upload_path, local_path).await?; + } + info!("Uploaded {} ({:?})", remote_path, time.elapsed()?); + if status_code != 200 { + return Err(CubeError::user(format!( + "minIO upload returned non OK status: {}", + status_code + ))); + } + Ok(()) + } + + async fn download_file(&self, remote_path: &str) -> Result { + let local_file = self.dir.as_path().join(remote_path); + let local_dir = local_file.parent().unwrap(); + let downloads_dir = local_dir.join("downloads"); + + let local_file_str = local_file.to_str().unwrap().to_string(); // return value. + + fs::create_dir_all(&downloads_dir).await?; + if !local_file.exists() { + let time = SystemTime::now(); + debug!("Downloading {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let status_code = cube_ext::spawn_blocking(move || -> Result { + let (mut temp_file, temp_path) = + NamedTempFile::new_in(&downloads_dir)?.into_parts(); + + let res = bucket.get_object_stream_blocking(path.as_str(), &mut temp_file)?; + temp_file.flush()?; + + temp_path.persist(local_file)?; + + Ok(res) + }) + .await??; + info!("Downloaded {} ({:?})", remote_path, time.elapsed()?); + if status_code != 200 { + return Err(CubeError::user(format!( + "minIO download returned non OK status: {}", + status_code + ))); + } + } + Ok(local_file_str) + } + + async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> { + let time = SystemTime::now(); + debug!("Deleting {}", remote_path); + info!("remote_path {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let (_, status_code) = + cube_ext::spawn_blocking(move || bucket.delete_object_blocking(path)).await??; + info!("Deleting {} ({:?})", remote_path, time.elapsed()?); + if status_code != 204 { + return Err(CubeError::user(format!( + "minIO delete returned non OK status: {}", + status_code + ))); + } + + let _guard = acquire_lock("delete file", self.delete_mut.lock()).await?; + let local = self.dir.as_path().join(remote_path); + if fs::metadata(local.clone()).await.is_ok() { + fs::remove_file(local.clone()).await?; + LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()) + .await?; + } + + Ok(()) + } + + async fn list(&self, remote_prefix: &str) -> Result, CubeError> { + Ok(self + .list_with_metadata(remote_prefix) + .await? + .into_iter() + .map(|f| f.remote_path) + .collect::>()) + } + + async fn list_with_metadata(&self, remote_prefix: &str) -> Result, CubeError> { + let path = self.s3_path(remote_prefix); + let bucket = self.bucket.read().unwrap().clone(); + let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??; + let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); + let result = list + .iter() + .flat_map(|(res, _)| { + res.contents + .iter() + .map(|o| -> Result { + Ok(RemoteFile { + remote_path: leading_slash.replace(&o.key, NoExpand("")).to_string(), + updated: DateTime::parse_from_rfc3339(&o.last_modified)? + .with_timezone(&Utc), + }) + }) + }) + .collect::, _>>()?; + Ok(result) + } + + async fn local_path(&self) -> String { + self.dir.to_str().unwrap().to_owned() + } + + async fn local_file(&self, remote_path: &str) -> Result { + let buf = self.dir.join(remote_path); + fs::create_dir_all(buf.parent().unwrap()).await?; + Ok(buf.to_str().unwrap().to_string()) + } +} +//TODO +impl MINIORemoteFs { + fn s3_path(&self, remote_path: &str) -> String { + format!( + "{}/{}", + self.sub_path + .as_ref() + .map(|p| p.to_string()) + .unwrap_or_else(|| "".to_string()), + remote_path + ) + } +} diff --git a/rust/cubestore/src/remotefs/mod.rs b/rust/cubestore/src/remotefs/mod.rs index 36d3c06ff7c65..384ec0f338e4a 100644 --- a/rust/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/src/remotefs/mod.rs @@ -1,4 +1,5 @@ pub mod gcs; +pub mod minio; pub mod queue; pub mod s3;