Skip to content

Conversation

@bbalser
Copy link
Collaborator

@bbalser bbalser commented Sep 15, 2025

An issue was identified when there serveral instances of aws_sdk_s3::Client created when the credentials are refreshed.
When trying to refresh credentials it will sometimes error saying no credentials found. The best fix identified was to create a single aws_sdk_s3::Client and clone it rather than creating new ones per bucket.

To address this, the FileStore struct was deleted and all low level methods from it have been moved to free functions on the file_store crate. These low level functions now take a file_store::Client and bucket as arguments.

@bbalser bbalser marked this pull request as ready for review September 16, 2025 14:48
pub async fn run(&self, settings: &Settings) -> Result {
let store = FileStore::from_settings(settings).await?;
let mut file_infos = self.filter.list(&store);
let client = settings.connect().await;
Copy link
Member

@kurotych kurotych Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for me, having connect function in settings module looks awkward.
I think I'd be nice to move it out of Settings module

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the pattern we have established with db_store, I thought it would feel good for both to follow the same pattern.

}

impl<Message, State, Parser> ManagedTask for FileInfoPollerServer<Message, State, FileStore, Parser>
impl<Message, State, Parser> ManagedTask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember why I hardcoded FileStore in here. But maybe we want to make this generic over Store like the impl block below.

That way someone could still use task-manager if they decide to implement FileInfoPollerStore themselves.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed eabefc1

Comment on lines 74 to 75
.map(|msg| (self.client.clone(), self.bucket.clone(), msg))
.for_each_concurrent(5, |(client, bucket, path)| async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential for future updates.

This function takes self, we can grab references to instead of cloning.

let client = &self.client;
let bucket = &self.bucket;

That would allow us to remove the .map() call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored eabefc1


pub fn list_files<A, B>(
client: &Client,
bucket: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to FileInfoPollerConfigBuilder::file_store(client, bucket), may be nice to mirror aws' bucket method and accept Into<String> for all these functions..

pub fn bucket(mut self, input: impl ::std::convert::Into<::std::string::String>) -> Self {
        self.inner = self.inner.bucket(input.into());
        self
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, refactored beffa76

Ok(stream) => stream_source(stream),
Err(err) => stream::once(async move { Err(err) }).boxed(),
})
// .fuse()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this .fuse() removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mistake, was testing something and forgot to uncomment

Comment on lines +30 to +34
crate::new_client(
self.endpoint.clone(),
self.access_key_id.clone(),
self.secret_access_key.clone(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our production happy path being to call this function where we expect all the settings to not be provided makes me feel like we're missing an abstraction. I'm not sure what. But I'll be thinking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a fair point, in our production settings we inherit all settings to connect to s3 from the environment. These properties are only necessary when doing local testing.

use std::{path::Path, time::Duration};

#[derive(Debug, Deserialize, Clone)]
pub struct Buckets {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice.

// Install the prometheus metrics exporter
poc_metrics::start_metrics(&settings.metrics)?;

let s3_client = settings.file_store.connect().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere else this is file_store_client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 6afada7

@bbalser bbalser force-pushed the bbalser/file-store-refactor branch from 5046c0a to 14851d3 Compare September 18, 2025 11:27
@bbalser bbalser merged commit bd88e65 into main Sep 18, 2025
57 checks passed
@bbalser bbalser deleted the bbalser/file-store-refactor branch September 18, 2025 12:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants