Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add possibility to configure a subfolder inside an S3 bucket #2647

Merged
merged 10 commits into from
Aug 28, 2023
24 changes: 18 additions & 6 deletions rust/hyperlane-base/src/types/checkpoint_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub enum CheckpointSyncerConf {
S3 {
/// Bucket name
bucket: String,
/// Folder name inside bucket
folder: String,
/// S3 Region
region: Region,
},
Expand All @@ -41,6 +43,8 @@ pub enum RawCheckpointSyncerConf {
S3 {
/// Bucket name
bucket: Option<String>,
/// Folder name inside bucket - defaults to the root of the bucket (i.e. empty string)
folder: Option<String>,
/// S3 Region
region: Option<String>,
},
Expand Down Expand Up @@ -79,10 +83,13 @@ impl FromRawConf<'_, RawCheckpointSyncerConf> for CheckpointSyncerConf {
}
Ok(Self::LocalStorage { path })
}
RawCheckpointSyncerConf::S3 { bucket, region } => Ok(Self::S3 {
RawCheckpointSyncerConf::S3 { bucket, folder, region } => Ok(Self::S3 {
bucket: bucket
.ok_or_else(|| eyre!("Missing `bucket` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "bucket")?,
folder: folder
.map_or(Ok::<String, Report>(String::from("")), Result::Ok)
.into_config_result(|| cwp + "folder")?,
region: region
.ok_or_else(|| eyre!("Missing `region` for S3 checkpoint syncer"))
.into_config_result(|| cwp + "region")?
Expand All @@ -106,13 +113,17 @@ impl FromStr for CheckpointSyncerConf {

match prefix {
"s3" => {
let [bucket, region]: [&str; 2] = suffix
let url_components = suffix
.split('/')
.collect::<Vec<_>>()
.try_into()
.map_err(|_| eyre!("Error parsing storage location; could not split bucket and region ({suffix})"))?;
.collect::<Vec<&str>>();
let [bucket, region, folder]: [&str; 3] = match url_components[..] {
[bucket, region] => Ok([bucket, region, ""]), // no folder means empty folder path
[bucket, region, folder] => Ok([bucket, region, folder]),
_ => Err(eyre!("Error parsing storage location; could not split bucket, region and folder ({suffix})"))
}?;
Ok(CheckpointSyncerConf::S3 {
bucket: bucket.into(),
folder: folder.into(),
region: region
.parse()
.context("Invalid region when parsing storage location")?,
Expand All @@ -136,8 +147,9 @@ impl CheckpointSyncerConf {
CheckpointSyncerConf::LocalStorage { path } => {
Box::new(LocalStorage::new(path.clone(), latest_index_gauge)?)
}
CheckpointSyncerConf::S3 { bucket, region } => Box::new(S3Storage::new(
CheckpointSyncerConf::S3 { bucket, folder, region } => Box::new(S3Storage::new(
bucket.clone(),
folder.clone(),
region.clone(),
latest_index_gauge,
)),
Expand Down
17 changes: 14 additions & 3 deletions rust/hyperlane-base/src/types/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const S3_REQUEST_TIMEOUT_SECONDS: u64 = 30;
pub struct S3Storage {
/// The name of the bucket.
bucket: String,
/// A specific folder inside the above repo - set to empty string to use the root of the bucket
folder: String,
/// The region of the bucket.
region: Region,
/// A client with AWS credentials.
Expand All @@ -44,6 +46,7 @@ impl fmt::Debug for S3Storage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
.field("bucket", &self.bucket)
.field("folder", &self.folder)
.field("region", &self.region)
.finish()
}
Expand All @@ -52,7 +55,7 @@ impl fmt::Debug for S3Storage {
impl S3Storage {
async fn write_to_bucket(&self, key: String, body: &str) -> Result<()> {
let req = PutObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
body: Some(Vec::from(body).into()),
content_type: Some("application/json".to_owned()),
Expand All @@ -69,7 +72,7 @@ impl S3Storage {
/// Uses an anonymous client. This should only be used for publicly accessible buckets.
async fn anonymously_read_from_bucket(&self, key: String) -> Result<Option<Vec<u8>>> {
let req = GetObjectRequest {
key,
key: self.get_composite_key(key),
bucket: self.bucket.clone(),
..Default::default()
};
Expand Down Expand Up @@ -120,6 +123,14 @@ impl S3Storage {
})
}

fn get_composite_key(&self, key: String) -> String {
if self.folder != "" {
key
} else {
format!("{}/{}", self.folder, key)
}
}

fn legacy_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}.json")
}
Expand Down Expand Up @@ -209,6 +220,6 @@ impl CheckpointSyncer for S3Storage {
}

fn announcement_location(&self) -> String {
format!("s3://{}/{}", self.bucket, self.region.name())
format!("s3://{}/{}/{}", self.bucket, self.region.name(), self.folder)
gbouv marked this conversation as resolved.
Show resolved Hide resolved
}
}