Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 91 additions & 30 deletions rust/cubestore/cubestore/src/remotefs/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub struct S3RemoteFs {
bucket: arc_swap::ArcSwap<Bucket>,
sub_path: Option<String>,
delete_mut: Mutex<()>,
/// When set, the refresh loop watches this file for changes and calls
/// STS AssumeRoleWithWebIdentity with the JWT inside it.
web_identity_token_file: Option<String>,
web_identity_role_arn: Option<String>,
}

impl fmt::Debug for S3RemoteFs {
Expand All @@ -50,36 +54,49 @@ impl S3RemoteFs {
bucket_name: String,
sub_path: Option<String>,
) -> Result<Arc<Self>, CubeError> {
// Incorrect naming for ENV variables...
let access_key = env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok();
let secret_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok();
let token_file = env::var("CUBESTORE_AWS_WEB_IDENTITY_TOKEN_FILE").ok();
let role_arn = env::var("CUBESTORE_AWS_ROLE_ARN").ok();

let credentials = if let (Some(ref tf), Some(ref arn)) = (&token_file, &role_arn) {
// Web identity mode: read JWT from file and exchange via STS.
let jwt = std::fs::read_to_string(tf).map_err(|e| {
CubeError::internal(format!(
"Failed to read web identity token file '{}': {}",
tf, e
))
})?;
info!(
"Using web identity token file for S3 credentials (role={})",
arn
);
Credentials::from_sts(arn, "cubestore", &jwt).map_err(|e| {
CubeError::internal(format!("STS AssumeRoleWithWebIdentity failed: {}", e))
})?
} else {
// Static credentials mode (or credential chain fallback).
Credentials::new(
access_key.as_deref(),
secret_key.as_deref(),
None,
None,
None,
)
.map_err(|e| CubeError::internal(format!("Failed to create S3 credentials: {}", e)))?
};

let credentials = Credentials::new(
access_key.as_deref(),
secret_key.as_deref(),
None,
None,
None,
)
.map_err(|err| {
CubeError::internal(format!(
"Failed to create S3 credentials: {}",
err.to_string()
))
})?;
let region = region.parse::<Region>().map_err(|err| {
CubeError::internal(format!(
"Failed to parse Region '{}': {}",
region,
err.to_string()
))
let region = region.parse::<Region>().map_err(|e| {
CubeError::internal(format!("Failed to parse Region '{}': {}", region, e))
})?;
let bucket = Bucket::new(&bucket_name, region.clone(), credentials)?;
let fs = Arc::new(Self {
dir,
bucket: arc_swap::ArcSwap::new(Arc::new(bucket)),
sub_path,
delete_mut: Mutex::new(()),
web_identity_token_file: token_file,
web_identity_role_arn: role_arn,
});
spawn_creds_refresh_loop(access_key, secret_key, bucket_name, region, &fs);

Expand All @@ -94,15 +111,36 @@ fn spawn_creds_refresh_loop(
region: Region,
fs: &Arc<S3RemoteFs>,
) {
// Refresh credentials. TODO: use expiration time.
let refresh_every = refresh_interval_from_env();
let token_file = fs.web_identity_token_file.clone();
let role_arn = fs.web_identity_role_arn.clone();
let is_web_identity = token_file.is_some() && role_arn.is_some();

// Web identity STS credentials expire in ~1 hour, so poll the token file
// every 30s by default. Static credentials use 3-hour default.
// CUBESTORE_AWS_CREDS_REFRESH_EVERY_MINS overrides both.
let refresh_every = {
let configured = refresh_interval_from_env();
if is_web_identity && configured == Duration::from_secs(60 * 180) {
Duration::from_secs(30)
} else {
configured
}
};

if refresh_every.as_secs() == 0 {
return;
}

let fs = Arc::downgrade(fs);
let mut last_modified = token_file
.as_ref()
.and_then(|f| std::fs::metadata(f).ok()?.modified().ok());

std::thread::spawn(move || {
log::debug!("Started S3 credentials refresh loop");
log::debug!(
"Started S3 credentials refresh loop (web_identity={})",
is_web_identity
);
loop {
std::thread::sleep(refresh_every);
let fs = match fs.upgrade() {
Expand All @@ -112,13 +150,36 @@ fn spawn_creds_refresh_loop(
}
Some(fs) => fs,
};
let c = match Credentials::new(
access_key.as_deref(),
secret_key.as_deref(),
None,
None,
None,
) {

// In web identity mode, only refresh when the token file changed.
if let (Some(ref file), Some(_)) = (&token_file, &role_arn) {
let current_modified = std::fs::metadata(file).ok().and_then(|m| m.modified().ok());
if current_modified == last_modified {
continue;
}
last_modified = current_modified;
info!("Web identity token file changed, refreshing S3 credentials");
}

let c = if let (Some(ref file), Some(ref arn)) = (&token_file, &role_arn) {
match std::fs::read_to_string(file) {
Ok(jwt) => Credentials::from_sts(arn, "cubestore", &jwt),
Err(e) => {
log::error!("Failed to read web identity token file: {}", e);
continue;
}
}
} else {
Credentials::new(
access_key.as_deref(),
secret_key.as_deref(),
None,
None,
None,
)
};

let c = match c {
Ok(c) => c,
Err(e) => {
log::error!("Failed to refresh S3 credentials: {}", e);
Expand Down
Loading