Skip to content

Commit

Permalink
feat(cubestore): S3 sub path support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 17, 2021
1 parent 4846080 commit 0cabd4c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
34 changes: 23 additions & 11 deletions rust/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ impl CubeServices {
#[derive(Debug, Clone)]
pub enum FileStoreProvider {
Local,
Filesystem { remote_dir: PathBuf },
S3 { region: String, bucket_name: String },
Filesystem {
remote_dir: PathBuf,
},
S3 {
region: String,
bucket_name: String,
sub_path: Option<String>,
},
}

pub struct Config {
Expand Down Expand Up @@ -178,6 +184,7 @@ impl Config {
FileStoreProvider::S3 {
bucket_name,
region: env::var("CUBESTORE_S3_REGION").unwrap(),
sub_path: env::var("CUBESTORE_S3_SUB_PATH").ok(),
}
} else if let Ok(remote_dir) = env::var("CUBESTORE_REMOTE_DIR") {
FileStoreProvider::Filesystem {
Expand Down Expand Up @@ -210,7 +217,7 @@ impl Config {
.unwrap_or(Vec::new()),
worker_bind_address: env::var("CUBESTORE_WORKER_PORT")
.ok()
.map(|v| format!("0.0.0.0:{}", v))
.map(|v| format!("0.0.0.0:{}", v)),
}),
}
}
Expand Down Expand Up @@ -250,23 +257,26 @@ impl Config {
}

pub async fn start_test<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.start_test_with_options(true, test_fn).await
}

pub async fn start_test_worker<T>(&self, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.start_test_with_options(false, test_fn).await
}

pub async fn start_test_with_options<T>(&self, clean_remote: bool, test_fn: impl FnOnce(CubeServices) -> T)
where
pub async fn start_test_with_options<T>(
&self,
clean_remote: bool,
test_fn: impl FnOnce(CubeServices) -> T,
) where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
Expand Down Expand Up @@ -338,10 +348,12 @@ impl Config {
FileStoreProvider::S3 {
region,
bucket_name,
sub_path,
} => S3RemoteFs::new(
self.config_obj.data_dir.clone(),
region.to_string(),
bucket_name.to_string(),
sub_path.clone(),
)?,
FileStoreProvider::Local => unimplemented!(), // TODO
})
Expand Down
31 changes: 21 additions & 10 deletions rust/cubestore/src/remotefs/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@ use tokio::sync::RwLock;
pub struct S3RemoteFs {
dir: RwLock<PathBuf>,
bucket: Bucket,
sub_path: Option<String>,
}

impl S3RemoteFs {
pub fn new(dir: PathBuf, region: String, bucket_name: String) -> Result<Arc<Self>, CubeError> {
pub fn new(
dir: PathBuf,
region: String,
bucket_name: String,
sub_path: Option<String>,
) -> Result<Arc<Self>, CubeError> {
let credentials = Credentials::default()?;
let bucket = Bucket::new(&bucket_name, region.parse()?, credentials)?;
Ok(Arc::new(Self {
dir: RwLock::new(dir),
bucket,
sub_path,
}))
}
}
Expand All @@ -36,7 +43,7 @@ impl RemoteFs for S3RemoteFs {
.bucket
.put_object_stream(
self.dir.read().await.as_path().join(remote_path),
format!("/{}", remote_path),
self.s3_path(remote_path),
)
.await?;
if status_code != 200 {
Expand All @@ -57,7 +64,7 @@ impl RemoteFs for S3RemoteFs {
let mut output_file = std::fs::File::create(path.as_str())?;
let status_code = self
.bucket
.get_object_stream(S3RemoteFs::s3_path(remote_path), &mut output_file)
.get_object_stream(self.s3_path(remote_path), &mut output_file)
.await?;
if status_code != 200 {
return Err(CubeError::user(format!(
Expand All @@ -71,10 +78,7 @@ impl RemoteFs for S3RemoteFs {

async fn delete_file(&self, remote_path: &str) -> Result<(), CubeError> {
debug!("Deleting {}", remote_path);
let (_, status_code) = self
.bucket
.delete_object(S3RemoteFs::s3_path(remote_path))
.await?;
let (_, status_code) = self.bucket.delete_object(self.s3_path(remote_path)).await?;
if status_code != 204 {
return Err(CubeError::user(format!(
"S3 delete returned non OK status: {}",
Expand Down Expand Up @@ -103,7 +107,7 @@ impl RemoteFs for S3RemoteFs {
}

async fn list_with_metadata(&self, remote_prefix: &str) -> Result<Vec<RemoteFile>, CubeError> {
let list = self.bucket.list(remote_prefix.to_string(), None).await?;
let list = self.bucket.list(self.s3_path(remote_prefix), None).await?;
let leading_slash = Regex::new(r"^/").unwrap();
let result = list
.iter()
Expand Down Expand Up @@ -134,7 +138,14 @@ impl RemoteFs for S3RemoteFs {
}

impl S3RemoteFs {
fn s3_path(remote_path: &str) -> String {
format!("/{}", remote_path)
fn s3_path(&self, remote_path: &str) -> String {
format!(
"{}/{}",
self.sub_path
.as_ref()
.map(|p| format!("/{}", p))
.unwrap_or_else(|| "".to_string()),
remote_path
)
}
}

0 comments on commit 0cabd4c

Please sign in to comment.