Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ upstream
dist
node_modules
downloaded
cubestore/target
cubesql/target
26 changes: 26 additions & 0 deletions rust/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService};
use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl};
use crate::queryplanner::{QueryPlanner, QueryPlannerImpl};
use crate::remotefs::gcs::GCSRemoteFs;
use crate::remotefs::minio::MINIORemoteFs;
use crate::remotefs::queue::QueueRemoteFs;
use crate::remotefs::s3::S3RemoteFs;
use crate::remotefs::{LocalDirRemoteFs, RemoteFs};
Expand Down Expand Up @@ -231,6 +232,7 @@ pub fn validate_config(c: &dyn ConfigObj) -> ValidationMessages {
}

let mut remote_vars = vec![
"CUBESTORE_MINIO_BUCKET",
"CUBESTORE_S3_BUCKET",
"CUBESTORE_GCS_BUCKET",
"CUBESTORE_REMOTE_DIR",
Expand Down Expand Up @@ -262,6 +264,10 @@ pub enum FileStoreProvider {
bucket_name: String,
sub_path: Option<String>,
},
MINIO {
bucket_name: String,
sub_path: Option<String>,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -540,6 +546,11 @@ impl Config {
),
sub_path: env::var("CUBESTORE_S3_SUB_PATH").ok(),
}
} else if let Ok(bucket_name) = env::var("CUBESTORE_MINIO_BUCKET") {
FileStoreProvider::MINIO {
bucket_name,
sub_path: env::var("CUBESTORE_MINIO_SUB_PATH").ok(),
}
} else if let Ok(bucket_name) = env::var("CUBESTORE_GCS_BUCKET") {
FileStoreProvider::GCS {
bucket_name,
Expand Down Expand Up @@ -795,6 +806,21 @@ impl Config {
})
.await;
}
FileStoreProvider::MINIO {
bucket_name,
sub_path,
} => {
let data_dir = self.config_obj.data_dir.clone();
let bucket_name = bucket_name.to_string();
let sub_path = sub_path.clone();
self.injector
.register("original_remote_fs", async move |_| {
let arc: Arc<dyn DIService> =
MINIORemoteFs::new(data_dir, bucket_name, sub_path).unwrap();
arc
})
.await;
}
FileStoreProvider::Local => unimplemented!(), // TODO
};
}
Expand Down
302 changes: 302 additions & 0 deletions rust/cubestore/src/remotefs/minio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
use crate::di_service;
use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs};
use crate::util::lock::acquire_lock;
use crate::CubeError;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion::cube_ext;
use log::{debug, info};
use regex::{NoExpand, Regex};
use s3::creds::Credentials;
use s3::{Bucket, Region};
use std::env;
use std::fmt;
use std::fmt::Formatter;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tempfile::NamedTempFile;
use tokio::fs;
use tokio::sync::Mutex;

pub struct MINIORemoteFs {
dir: PathBuf,
bucket: std::sync::RwLock<Bucket>,
sub_path: Option<String>,
delete_mut: Mutex<()>,
}

//TODO Not if this needs any changes
impl fmt::Debug for MINIORemoteFs {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let mut s = f.debug_struct("MINIORemoteFs");
s.field("dir", &self.dir).field("sub_path", &self.sub_path);
// Do not expose MINIO (secret) credentials.
match self.bucket.try_read() {
Ok(bucket) => s
.field("bucket_name", &bucket.name)
.field("bucket_region", &bucket.region),
Err(_) => s.field("bucket", &"<locked>"),
};
s.finish_non_exhaustive()
}
}

impl MINIORemoteFs {
pub fn new(
dir: PathBuf,
bucket_name: String,
sub_path: Option<String>,
) -> Result<Arc<Self>, CubeError> {
let key_id = env::var("CUBESTORE_MINIO_ACCESS_KEY_ID").ok();
let access_key = env::var("CUBESTORE_MINIO_SECRET_ACCESS_KEY").ok();
let minio_server_endpoint = env::var("CUBESTORE_MINIO_SERVER_ENDPOINT").ok();

let credentials =
Credentials::new(key_id.as_deref(), access_key.as_deref(), None, None, None)?;
let region = Region::Custom {
region: "".to_owned(),
endpoint: minio_server_endpoint
.as_deref()
.unwrap_or("localhost:")
.to_string(),
};
let bucket = std::sync::RwLock::new(Bucket::new_with_path_style(
&bucket_name,
region.clone(),
credentials,
)?);
let fs = Arc::new(Self {
dir,
bucket,
sub_path,
delete_mut: Mutex::new(()),
});
spawn_creds_refresh_loop(key_id, access_key, bucket_name, region, &fs);
Ok(fs)
}
}

fn spawn_creds_refresh_loop(
key_id: Option<String>,
access_key: Option<String>,
bucket_name: String,
region: Region,
fs: &Arc<MINIORemoteFs>,
) {
// Refresh credentials. TODO: use expiration time.
let refresh_every = refresh_interval_from_env();
if refresh_every.as_secs() == 0 {
return;
}
let fs = Arc::downgrade(fs);
std::thread::spawn(move || {
log::debug!("Started MINIO credentials refresh loop");
loop {
std::thread::sleep(refresh_every);
let fs = match fs.upgrade() {
None => {
log::debug!("Stopping MINIO credentials refresh loop");
return;
}
Some(fs) => fs,
};
let c = match Credentials::new(
key_id.as_deref(),
access_key.as_deref(),
None,
None,
None,
) {
Ok(c) => c,
Err(e) => {
log::error!("Failed to refresh minIO credentials: {}", e);
continue;
}
};
let b = match Bucket::new_with_path_style(&bucket_name, region.clone(), c) {
Ok(b) => b,
Err(e) => {
log::error!("Failed to refresh minIO credentials: {}", e);
continue;
}
};
*fs.bucket.write().unwrap() = b;
log::debug!("Successfully refreshed minIO credentials")
}
});
}

fn refresh_interval_from_env() -> Duration {
let mut mins = 180; // 3 hours by default.
if let Ok(s) = std::env::var("CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS") {
match s.parse::<u64>() {
Ok(i) => mins = i,
Err(e) => log::error!("Could not parse CUBESTORE_MINIO_CREDS_REFRESH_EVERY_MINS. Refreshing every {} minutes. Error: {}", mins, e),
};
};
Duration::from_secs(60 * mins)
}

di_service!(MINIORemoteFs, [RemoteFs]);

#[async_trait]
impl RemoteFs for MINIORemoteFs {
async fn upload_file(
&self,
temp_upload_path: &str,
remote_path: &str,
) -> Result<(), CubeError> {
let time = SystemTime::now();
debug!("Uploading {}", remote_path);
let path = self.s3_path(remote_path);
info!("path {}", remote_path);
let bucket = self.bucket.read().unwrap().clone();
let temp_upload_path_copy = temp_upload_path.to_string();
let status_code = cube_ext::spawn_blocking(move || {
bucket.put_object_stream_blocking(temp_upload_path_copy, path)
})
.await??;
let local_path = self.dir.as_path().join(remote_path);
if Path::new(temp_upload_path) != local_path {
fs::create_dir_all(local_path.parent().unwrap())
.await
.map_err(|e| {
CubeError::internal(format!(
"Create dir {}: {}",
local_path.parent().as_ref().unwrap().to_string_lossy(),
e
))
})?;
fs::rename(&temp_upload_path, local_path).await?;
}
info!("Uploaded {} ({:?})", remote_path, time.elapsed()?);
if status_code != 200 {
return Err(CubeError::user(format!(
"minIO upload returned non OK status: {}",
status_code
)));
}
Ok(())
}

async fn download_file(&self, remote_path: &str) -> Result<String, CubeError> {
let local_file = self.dir.as_path().join(remote_path);
let local_dir = local_file.parent().unwrap();
let downloads_dir = local_dir.join("downloads");

let local_file_str = local_file.to_str().unwrap().to_string(); // return value.

fs::create_dir_all(&downloads_dir).await?;
if !local_file.exists() {
let time = SystemTime::now();
debug!("Downloading {}", remote_path);
let path = self.s3_path(remote_path);
info!("path {}", remote_path);
let bucket = self.bucket.read().unwrap().clone();
let status_code = cube_ext::spawn_blocking(move || -> Result<u16, CubeError> {
let (mut temp_file, temp_path) =
NamedTempFile::new_in(&downloads_dir)?.into_parts();

let res = bucket.get_object_stream_blocking(path.as_str(), &mut temp_file)?;
temp_file.flush()?;

temp_path.persist(local_file)?;

Ok(res)
})
.await??;
info!("Downloaded {} ({:?})", remote_path, time.elapsed()?);
if status_code != 200 {
return Err(CubeError::user(format!(
"minIO download returned non OK status: {}",
status_code
)));
}
}
Ok(local_file_str)
}

async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> {
let time = SystemTime::now();
debug!("Deleting {}", remote_path);
info!("remote_path {}", remote_path);
let path = self.s3_path(remote_path);
info!("path {}", remote_path);
let bucket = self.bucket.read().unwrap().clone();
let (_, status_code) =
cube_ext::spawn_blocking(move || bucket.delete_object_blocking(path)).await??;
info!("Deleting {} ({:?})", remote_path, time.elapsed()?);
if status_code != 204 {
return Err(CubeError::user(format!(
"minIO delete returned non OK status: {}",
status_code
)));
}

let _guard = acquire_lock("delete file", self.delete_mut.lock()).await?;
let local = self.dir.as_path().join(remote_path);
if fs::metadata(local.clone()).await.is_ok() {
fs::remove_file(local.clone()).await?;
LocalDirRemoteFs::remove_empty_paths(self.dir.as_path().to_path_buf(), local.clone())
.await?;
}

Ok(())
}

async fn list(&self, remote_prefix: &str) -> Result<Vec<String>, CubeError> {
Ok(self
.list_with_metadata(remote_prefix)
.await?
.into_iter()
.map(|f| f.remote_path)
.collect::<Vec<_>>())
}

async fn list_with_metadata(&self, remote_prefix: &str) -> Result<Vec<RemoteFile>, CubeError> {
let path = self.s3_path(remote_prefix);
let bucket = self.bucket.read().unwrap().clone();
let list = cube_ext::spawn_blocking(move || bucket.list_blocking(path, None)).await??;
let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap();
let result = list
.iter()
.flat_map(|(res, _)| {
res.contents
.iter()
.map(|o| -> Result<RemoteFile, CubeError> {
Ok(RemoteFile {
remote_path: leading_slash.replace(&o.key, NoExpand("")).to_string(),
updated: DateTime::parse_from_rfc3339(&o.last_modified)?
.with_timezone(&Utc),
})
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(result)
}

async fn local_path(&self) -> String {
self.dir.to_str().unwrap().to_owned()
}

async fn local_file(&self, remote_path: &str) -> Result<String, CubeError> {
let buf = self.dir.join(remote_path);
fs::create_dir_all(buf.parent().unwrap()).await?;
Ok(buf.to_str().unwrap().to_string())
}
}
//TODO
impl MINIORemoteFs {
fn s3_path(&self, remote_path: &str) -> String {
format!(
"{}/{}",
self.sub_path
.as_ref()
.map(|p| p.to_string())
.unwrap_or_else(|| "".to_string()),
remote_path
)
}
}
1 change: 1 addition & 0 deletions rust/cubestore/src/remotefs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod gcs;
pub mod minio;
pub mod queue;
pub mod s3;

Expand Down