From f3ada4dcb05e9ba06bd38a6f82a67254ec8b9501 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 15:25:44 +0200 Subject: [PATCH 01/10] [feat] minio intergration --- rust/cubestore/src/config/mod.rs | 30 +++ rust/cubestore/src/remotefs/minio.rs | 302 +++++++++++++++++++++++++++ rust/cubestore/src/remotefs/s3.rs | 1 + 3 files changed, 333 insertions(+) create mode 100644 rust/cubestore/src/remotefs/minio.rs diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index e8a097fcbe5fa..3ca61e37543f6 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -18,6 +18,7 @@ use crate::queryplanner::{QueryPlanner, QueryPlannerImpl}; use crate::remotefs::gcs::GCSRemoteFs; use crate::remotefs::queue::QueueRemoteFs; use crate::remotefs::s3::S3RemoteFs; +use crate::remotefs::minio::MINIORemoteFs; use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; use crate::scheduler::SchedulerImpl; use crate::sql::{SqlService, SqlServiceImpl}; @@ -231,6 +232,7 @@ pub fn validate_config(c: &dyn ConfigObj) -> ValidationMessages { } let mut remote_vars = vec![ + "CUBESTORE_MINIO_BUCKET" "CUBESTORE_S3_BUCKET", "CUBESTORE_GCS_BUCKET", "CUBESTORE_REMOTE_DIR", @@ -262,6 +264,12 @@ pub enum FileStoreProvider { bucket_name: String, sub_path: Option, }, + MINIO { + region: String, + bucket_name: String, + server_url: String, + sub_path: Option, + }, } #[derive(Clone)] @@ -540,6 +548,11 @@ impl Config { ), sub_path: env::var("CUBESTORE_S3_SUB_PATH").ok(), } + } else if let Ok(bucket_name) = env::var("CUBESTORE_MINIO_BUCKET") { + FileStoreProvider::MINIO { + bucket_name, + sub_path: env::var("CUBESTORE_MINIO_SUB_PATH").ok(), + } } else if let Ok(bucket_name) = env::var("CUBESTORE_GCS_BUCKET") { FileStoreProvider::GCS { bucket_name, @@ -795,6 +808,23 @@ impl Config { }) .await; } + FileStoreProvider::MINIO { + region, + bucket_name, + sub_path, + } => { + let data_dir = self.config_obj.data_dir.clone(); + let region = region.to_string(); + let bucket_name = bucket_name.to_string(); + let sub_path = sub_path.clone(); + self.injector + .register("original_remote_fs", async move |_| { + let arc: Arc = + MINIORemoteFs::new(data_dir, region, bucket_name, sub_path).unwrap(); + arc + }) + .await; + } FileStoreProvider::Local => unimplemented!(), // TODO }; } diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs new file mode 100644 index 0000000000000..07cea45f002bd --- /dev/null +++ b/rust/cubestore/src/remotefs/minio.rs @@ -0,0 +1,302 @@ +use crate::di_service; +use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs}; +use crate::util::lock::acquire_lock; +use crate::CubeError; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::cube_ext; +use log::{debug, info}; +use regex::{NoExpand, Regex}; +use s3::creds::Credentials; +use s3::{Bucket, Region}; +use std::env; +use std::fmt; +use std::fmt::Formatter; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tempfile::NamedTempFile; +use tokio::fs; +use tokio::sync::Mutex; + +pub struct MINIORemoteFs { + dir: PathBuf, + bucket: std::sync::RwLock, + sub_path: Option, + delete_mut: Mutex<()>, +} + + +//TODO Not if this needs any changes +impl fmt::Debug for MINIORemoteFs { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let mut s = f.debug_struct("MINIORemoteFs"); + s.field("dir", &self.dir).field("sub_path", &self.sub_path); + // Do not expose MINIO (secret) credentials. + match self.bucket.try_read() { + Ok(bucket) => s + .field("bucket_name", &bucket.name) + .field("bucket_region", &bucket.region), + Err(_) => s.field("bucket", &""), + }; + s.finish_non_exhaustive() + } +} + +impl MINIORemoteFs { + pub fn new( + dir: PathBuf, + region: String, + bucket_name: String, + sub_path: Option, + ) -> Result, CubeError> { + let key_id = env::var("CUBESTORE_MINIO_ACCESS_KEY_ID").ok(); + let access_key = env::var("CUBESTORE_MINIO_SECRET_ACCESS_KEY").ok(); + let minio_server_endpoint = env::var("CUBESTORE_MINIO_SERVER_ENDPOINT").ok(); + + let credentials = + Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; + let region = Region::Custom { + region: "".to_owned(), + endpoint: minio_server_endpoint.as_deref(), + }, + let bucket = + std::sync::RwLock::new(Bucket::new(&bucket_name, region.clone(), credentials)?); + let fs = Arc::new(Self { + dir, + bucket, + sub_path, + delete_mut: Mutex::new(()), + }); + spawn_creds_refresh_loop(key_id, access_key, bucket_name, region, &fs); + Ok(fs) + } +} + +fn spawn_creds_refresh_loop( + key_id: Option, + access_key: Option, + bucket_name: String, + region: Region, + fs: &Arc, +) { + // Refresh credentials. TODO: use expiration time. + let refresh_every = refresh_interval_from_env(); + if refresh_every.as_secs() == 0 { + return; + } + let fs = Arc::downgrade(fs); + std::thread::spawn(move || { + log::debug!("Started MINIO credentials refresh loop"); + loop { + std::thread::sleep(refresh_every); + let fs = match fs.upgrade() { + None => { + log::debug!("Stopping MINIO credentials refresh loop"); + return; + } + Some(fs) => fs, + }; + let c = match Credentials::new( + key_id.as_deref(), + access_key.as_deref(), + None, + None, + None, + ) { + Ok(c) => c, + Err(e) => { + log::error!("Failed to refresh minIO credentials: {}", e); + continue; + } + }; + let b = match Bucket::new(&bucket_name, region.clone(), c) { + Ok(b) => b, + Err(e) => { + log::error!("Failed to refresh minIO credentials: {}", e); + continue; + } + }; + *fs.bucket.write().unwrap() = b; + log::debug!("Successfully refreshed minIO credentials") + } + }); +} + +fn refresh_interval_from_env() -> Duration { + let mut mins = 180; // 3 hours by default. + if let Ok(s) = std::env::var("CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS") { + match s.parse::() { + Ok(i) => mins = i, + Err(e) => log::error!("Could not parse CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS. Refreshing every {} minutes. Error: {}", mins, e), + }; + }; + Duration::from_secs(60 * mins) +} + + +di_service!(MINIORemoteFs, [RemoteFs]); + +#[async_trait] +impl RemoteFs for MINIORemoteFs { + //TODO + async fn upload_file( + &self, + temp_upload_path: &str, + remote_path: &str, + ) -> Result<(), CubeError> { + let time = SystemTime::now(); + debug!("Uploading {}", remote_path); + info!("remote_path {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let temp_upload_path_copy = temp_upload_path.to_string(); + let status_code = cube_ext::spawn_blocking(move || { + bucket.put_object_stream_blocking(temp_upload_path_copy, path) + }) + .await??; + let local_path = self.dir.as_path().join(remote_path); + if Path::new(temp_upload_path) != local_path { + fs::create_dir_all(local_path.parent().unwrap()) + .await + .map_err(|e| { + CubeError::internal(format!( + "Create dir {}: {}", + local_path.parent().as_ref().unwrap().to_string_lossy(), + e + )) + })?; + fs::rename(&temp_upload_path, local_path).await?; + } + info!("Uploaded {} ({:?})", remote_path, time.elapsed()?); + if status_code != 200 { + return Err(CubeError::user(format!( + "minIO upload returned non OK status: {}", + status_code + ))); + } + Ok(()) + } + //TODO + async fn download_file(&self, remote_path: &str) -> Result { + let local_file = self.dir.as_path().join(remote_path); + let local_dir = local_file.parent().unwrap(); + let downloads_dir = local_dir.join("downloads"); + + let local_file_str = local_file.to_str().unwrap().to_string(); // return value. + + fs::create_dir_all(&downloads_dir).await?; + if !local_file.exists() { + let time = SystemTime::now(); + debug!("Downloading {}", remote_path); + info!("remote_path {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let status_code = cube_ext::spawn_blocking(move || -> Result { + let (mut temp_file, temp_path) = + NamedTempFile::new_in(&downloads_dir)?.into_parts(); + + let res = bucket.get_object_stream_blocking(path.as_str(), &mut temp_file)?; + temp_file.flush()?; + + temp_path.persist(local_file)?; + + Ok(res) + }) + .await??; + info!("Downloaded {} ({:?})", remote_path, time.elapsed()?); + if status_code != 200 { + return Err(CubeError::user(format!( + "minIO download returned non OK status: {}", + status_code + ))); + } + } + Ok(local_file_str) + } + //TODO + async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> { + let time = SystemTime::now(); + debug!("Deleting {}", remote_path); + info!("remote_path {}", remote_path); + let path = self.s3_path(remote_path); + info!("path {}", remote_path); + let bucket = self.bucket.read().unwrap().clone(); + let (_, status_code) = + cube_ext::spawn_blocking(move || bucket.delete_object_blocking(path)).await??; + info!("Deleting {} ({:?})", remote_path, time.elapsed()?); + if status_code != 204 { + return Err(CubeError::user(format!( + "minIO delete returned non OK status: {}", + status_code + ))); + } + + let _guard = acquire_lock("delete file", self.delete_mut.lock()).await?; + let local = self.dir.as_path().join(remote_path); + if fs::metadata(local.clone()).await.is_ok() { + fs::remove_file(local.clone()).await?; + LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone()) + .await?; + } + + Ok(()) + } + //TODO + async fn list(&self, remote_prefix: &str) -> Result, CubeError> { + Ok(self + .list_with_metadata(remote_prefix) + .await? + .into_iter() + .map(|f| f.remote_path) + .collect::>()) + } + //TODO + async fn list_with_metadata(&self, remote_prefix: &str) -> Result, CubeError> { + let path = self.s3_path(remote_prefix); + let bucket = self.bucket.read().unwrap().clone(); + let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??; + let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); + let result = list + .iter() + .flat_map(|(res, _)| { + res.contents + .iter() + .map(|o| -> Result { + Ok(RemoteFile { + remote_path: leading_slash.replace(&o.key, NoExpand("")).to_string(), + updated: DateTime::parse_from_rfc3339(&o.last_modified)? + .with_timezone(&Utc), + }) + }) + }) + .collect::, _>>()?; + Ok(result) + } + //TODO + async fn local_path(&self) -> String { + self.dir.to_str().unwrap().to_owned() + } + //TODO + async fn local_file(&self, remote_path: &str) -> Result { + let buf = self.dir.join(remote_path); + fs::create_dir_all(buf.parent().unwrap()).await?; + Ok(buf.to_str().unwrap().to_string()) + } +} +//TODO +impl MINIORemoteFs { + fn s3_path(&self, remote_path: &str) -> String { + format!( + "{}/{}", + self.sub_path + .as_ref() + .map(|p| p.to_string()) + .unwrap_or_else(|| "".to_string()), + remote_path + ) + } +} diff --git a/rust/cubestore/src/remotefs/s3.rs b/rust/cubestore/src/remotefs/s3.rs index 629b596b57627..c9562344babf0 100644 --- a/rust/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/src/remotefs/s3.rs @@ -27,6 +27,7 @@ pub struct S3RemoteFs { delete_mut: Mutex<()>, } + impl fmt::Debug for S3RemoteFs { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let mut s = f.debug_struct("S3RemoteFs"); From 89446b2d86f5d61e280c9f6ad4f8d542f09090dd Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 16:41:45 +0200 Subject: [PATCH 02/10] [bug-fix] removed errors in code to compile correctly --- rust/cubestore/src/config/mod.rs | 5 ++--- rust/cubestore/src/remotefs/minio.rs | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 3ca61e37543f6..e3dd1bb1623a1 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -232,7 +232,7 @@ pub fn validate_config(c: &dyn ConfigObj) -> ValidationMessages { } let mut remote_vars = vec![ - "CUBESTORE_MINIO_BUCKET" + "CUBESTORE_MINIO_BUCKET", "CUBESTORE_S3_BUCKET", "CUBESTORE_GCS_BUCKET", "CUBESTORE_REMOTE_DIR", @@ -267,7 +267,6 @@ pub enum FileStoreProvider { MINIO { region: String, bucket_name: String, - server_url: String, sub_path: Option, }, } @@ -820,7 +819,7 @@ impl Config { self.injector .register("original_remote_fs", async move |_| { let arc: Arc = - MINIORemoteFs::new(data_dir, region, bucket_name, sub_path).unwrap(); + MINIORemoteFs::new(data_dir, bucket_name, sub_path).unwrap(); arc }) .await; diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 07cea45f002bd..071dee0ac685f 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -47,7 +47,6 @@ impl fmt::Debug for MINIORemoteFs { impl MINIORemoteFs { pub fn new( dir: PathBuf, - region: String, bucket_name: String, sub_path: Option, ) -> Result, CubeError> { From 8078e160b3bb9ae4fc9c6309a27adbdbc96c0ed3 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 21:49:02 +0200 Subject: [PATCH 03/10] [Linting] --- rust/cubestore/src/config/mod.rs | 4 ++-- rust/cubestore/src/remotefs/minio.rs | 20 +++++++++----------- rust/cubestore/src/remotefs/s3.rs | 1 - 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index e3dd1bb1623a1..0a81061ffd4b2 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -16,9 +16,9 @@ use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService}; use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl}; use crate::queryplanner::{QueryPlanner, QueryPlannerImpl}; use crate::remotefs::gcs::GCSRemoteFs; +use crate::remotefs::minio::MINIORemoteFs; use crate::remotefs::queue::QueueRemoteFs; use crate::remotefs::s3::S3RemoteFs; -use crate::remotefs::minio::MINIORemoteFs; use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; use crate::scheduler::SchedulerImpl; use crate::sql::{SqlService, SqlServiceImpl}; @@ -819,7 +819,7 @@ impl Config { self.injector .register("original_remote_fs", async move |_| { let arc: Arc = - MINIORemoteFs::new(data_dir, bucket_name, sub_path).unwrap(); + MINIORemoteFs::new(data_dir, bucket_name, sub_path).unwrap(); arc }) .await; diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 071dee0ac685f..025958d8d48a8 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -61,7 +61,7 @@ impl MINIORemoteFs { endpoint: minio_server_endpoint.as_deref(), }, let bucket = - std::sync::RwLock::new(Bucket::new(&bucket_name, region.clone(), credentials)?); + std::sync::RwLock::new(Bucket::new_with_path_style(&bucket_name, region.clone(), credentials)?); let fs = Arc::new(Self { dir, bucket, @@ -110,7 +110,7 @@ fn spawn_creds_refresh_loop( continue; } }; - let b = match Bucket::new(&bucket_name, region.clone(), c) { + let b = match Bucket::new_with_path_style(&bucket_name, region.clone(), c) { Ok(b) => b, Err(e) => { log::error!("Failed to refresh minIO credentials: {}", e); @@ -139,7 +139,7 @@ di_service!(MINIORemoteFs, [RemoteFs]); #[async_trait] impl RemoteFs for MINIORemoteFs { - //TODO + async fn upload_file( &self, temp_upload_path: &str, @@ -147,7 +147,6 @@ impl RemoteFs for MINIORemoteFs { ) -> Result<(), CubeError> { let time = SystemTime::now(); debug!("Uploading {}", remote_path); - info!("remote_path {}", remote_path); let path = self.s3_path(remote_path); info!("path {}", remote_path); let bucket = self.bucket.read().unwrap().clone(); @@ -178,7 +177,7 @@ impl RemoteFs for MINIORemoteFs { } Ok(()) } - //TODO + async fn download_file(&self, remote_path: &str) -> Result { let local_file = self.dir.as_path().join(remote_path); let local_dir = local_file.parent().unwrap(); @@ -190,7 +189,6 @@ impl RemoteFs for MINIORemoteFs { if !local_file.exists() { let time = SystemTime::now(); debug!("Downloading {}", remote_path); - info!("remote_path {}", remote_path); let path = self.s3_path(remote_path); info!("path {}", remote_path); let bucket = self.bucket.read().unwrap().clone(); @@ -216,7 +214,7 @@ impl RemoteFs for MINIORemoteFs { } Ok(local_file_str) } - //TODO + async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> { let time = SystemTime::now(); debug!("Deleting {}", remote_path); @@ -244,7 +242,7 @@ impl RemoteFs for MINIORemoteFs { Ok(()) } - //TODO + async fn list(&self, remote_prefix: &str) -> Result, CubeError> { Ok(self .list_with_metadata(remote_prefix) @@ -253,7 +251,7 @@ impl RemoteFs for MINIORemoteFs { .map(|f| f.remote_path) .collect::>()) } - //TODO + async fn list_with_metadata(&self, remote_prefix: &str) -> Result, CubeError> { let path = self.s3_path(remote_prefix); let bucket = self.bucket.read().unwrap().clone(); @@ -275,11 +273,11 @@ impl RemoteFs for MINIORemoteFs { .collect::, _>>()?; Ok(result) } - //TODO + async fn local_path(&self) -> String { self.dir.to_str().unwrap().to_owned() } - //TODO + async fn local_file(&self, remote_path: &str) -> Result { let buf = self.dir.join(remote_path); fs::create_dir_all(buf.parent().unwrap()).await?; diff --git a/rust/cubestore/src/remotefs/s3.rs b/rust/cubestore/src/remotefs/s3.rs index c9562344babf0..629b596b57627 100644 --- a/rust/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/src/remotefs/s3.rs @@ -27,7 +27,6 @@ pub struct S3RemoteFs { delete_mut: Mutex<()>, } - impl fmt::Debug for S3RemoteFs { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let mut s = f.debug_struct("S3RemoteFs"); From bc388ffbe93011371c075681cb0bb838c3680699 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 22:03:15 +0200 Subject: [PATCH 04/10] [fix] missing import minio --- rust/cubestore/src/config/mod.rs | 1 - rust/cubestore/src/remotefs/mod.rs | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 0a81061ffd4b2..7897fbad3c939 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -265,7 +265,6 @@ pub enum FileStoreProvider { sub_path: Option, }, MINIO { - region: String, bucket_name: String, sub_path: Option, }, diff --git a/rust/cubestore/src/remotefs/mod.rs b/rust/cubestore/src/remotefs/mod.rs index 36d3c06ff7c65..1747e663c0850 100644 --- a/rust/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/src/remotefs/mod.rs @@ -1,6 +1,7 @@ pub mod gcs; pub mod queue; pub mod s3; +pub mod minio; use crate::config::injection::DIService; use crate::di_service; From a73ba2f1bdf7044bf3ca41e27c22b8b75e982441 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 22:06:45 +0200 Subject: [PATCH 05/10] [bug-fix] --- rust/cubestore/src/remotefs/minio.rs | 16 ++++++++-------- rust/cubestore/src/remotefs/mod.rs | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 025958d8d48a8..62ac26e758471 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -27,7 +27,6 @@ pub struct MINIORemoteFs { delete_mut: Mutex<()>, } - //TODO Not if this needs any changes impl fmt::Debug for MINIORemoteFs { fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -56,12 +55,15 @@ impl MINIORemoteFs { let credentials = Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; - let region = Region::Custom { + let region = Region::Custom { region: "".to_owned(), endpoint: minio_server_endpoint.as_deref(), - }, - let bucket = - std::sync::RwLock::new(Bucket::new_with_path_style(&bucket_name, region.clone(), credentials)?); + }; + let bucket = std::sync::RwLock::new(Bucket::new_with_path_style( + &bucket_name, + region.clone(), + credentials, + )?); let fs = Arc::new(Self { dir, bucket, @@ -134,12 +136,10 @@ fn refresh_interval_from_env() -> Duration { Duration::from_secs(60 * mins) } - di_service!(MINIORemoteFs, [RemoteFs]); #[async_trait] impl RemoteFs for MINIORemoteFs { - async fn upload_file( &self, temp_upload_path: &str, @@ -284,7 +284,7 @@ impl RemoteFs for MINIORemoteFs { Ok(buf.to_str().unwrap().to_string()) } } -//TODO +//TODO impl MINIORemoteFs { fn s3_path(&self, remote_path: &str) -> String { format!( diff --git a/rust/cubestore/src/remotefs/mod.rs b/rust/cubestore/src/remotefs/mod.rs index 1747e663c0850..384ec0f338e4a 100644 --- a/rust/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/src/remotefs/mod.rs @@ -1,7 +1,7 @@ pub mod gcs; +pub mod minio; pub mod queue; pub mod s3; -pub mod minio; use crate::config::injection::DIService; use crate::di_service; From c5f0fedb88766097b9e8a3df899a01324db642d2 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 22:22:26 +0200 Subject: [PATCH 06/10] [bug-fix] --- rust/cubestore/src/config/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 7897fbad3c939..957a83621f0b0 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -807,7 +807,6 @@ impl Config { .await; } FileStoreProvider::MINIO { - region, bucket_name, sub_path, } => { From a3d3fe06037a911d6f75c062ed5bbf97cdf59b65 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 22:41:13 +0200 Subject: [PATCH 07/10] [bug-fix] --- rust/cubestore/src/config/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 957a83621f0b0..c9609657f2f03 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -811,7 +811,6 @@ impl Config { sub_path, } => { let data_dir = self.config_obj.data_dir.clone(); - let region = region.to_string(); let bucket_name = bucket_name.to_string(); let sub_path = sub_path.clone(); self.injector From 264de63476b7dc50b4f02e2463ccb0860cf676b3 Mon Sep 17 00:00:00 2001 From: Antitankers Date: Thu, 25 Nov 2021 22:53:01 +0200 Subject: [PATCH 08/10] [bug-fix] --- rust/cubestore/src/remotefs/minio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 62ac26e758471..12357ff231934 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -57,7 +57,7 @@ impl MINIORemoteFs { Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; let region = Region::Custom { region: "".to_owned(), - endpoint: minio_server_endpoint.as_deref(), + endpoint: minio_server_endpoint.to_string().as_deref(), }; let bucket = std::sync::RwLock::new(Bucket::new_with_path_style( &bucket_name, From 1d5a3a434d9256d6aa7da0da9ff0d1790865b8d7 Mon Sep 17 00:00:00 2001 From: Pieter van Zyl Date: Fri, 26 Nov 2021 10:34:37 +0200 Subject: [PATCH 09/10] [bug-fixes] --- rust/.gitignore | 2 ++ rust/cubestore/src/remotefs/minio.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/.gitignore b/rust/.gitignore index e0df730b3c00b..2f35ae7a21f86 100644 --- a/rust/.gitignore +++ b/rust/.gitignore @@ -6,3 +6,5 @@ upstream dist node_modules downloaded +cubestore/target +cubesql/target \ No newline at end of file diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 12357ff231934..898625353248b 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -57,7 +57,7 @@ impl MINIORemoteFs { Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; let region = Region::Custom { region: "".to_owned(), - endpoint: minio_server_endpoint.to_string().as_deref(), + endpoint: minio_server_endpoint.as_deref().unwrap_or("localhost:").to_string(), }; let bucket = std::sync::RwLock::new(Bucket::new_with_path_style( &bucket_name, From 14dbc1b4aa95d77cb86d41a1e58e9e59d8af5bfc Mon Sep 17 00:00:00 2001 From: Pieter van Zyl Date: Fri, 26 Nov 2021 13:34:19 +0200 Subject: [PATCH 10/10] [linting] cargo fmt --- rust/cubestore/src/remotefs/minio.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/cubestore/src/remotefs/minio.rs b/rust/cubestore/src/remotefs/minio.rs index 898625353248b..01c75cc401b75 100644 --- a/rust/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/src/remotefs/minio.rs @@ -57,7 +57,10 @@ impl MINIORemoteFs { Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?; let region = Region::Custom { region: "".to_owned(), - endpoint: minio_server_endpoint.as_deref().unwrap_or("localhost:").to_string(), + endpoint: minio_server_endpoint + .as_deref() + .unwrap_or("localhost:") + .to_string(), }; let bucket = std::sync::RwLock::new(Bucket::new_with_path_style( &bucket_name,