Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,9 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// Configuration for attachment uploads.
#[serde(default)]
pub upload: UploadServiceConfig,
}

impl Default for Processing {
Expand All @@ -1191,6 +1194,7 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
upload: UploadServiceConfig::default(),
}
}
}
Expand Down Expand Up @@ -1239,6 +1243,26 @@ impl Default for OutcomeAggregatorConfig {
}
}

/// Configuration values for attachment uploads.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct UploadServiceConfig {
/// Maximum concurrency of uploads.
pub max_concurrent_requests: usize,

/// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
pub timeout: u64,
}

impl Default for UploadServiceConfig {
fn default() -> Self {
Self {
max_concurrent_requests: 100,
timeout: 60,
}
}
}

/// Determines how to emit outcomes.
/// For compatibility reasons, this can either be true, false or AsClientReports
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -2493,6 +2517,11 @@ impl Config {
&self.values.processing.topics.unused
}

/// Configuration of the attachment upload service.
pub fn upload(&self) -> &UploadServiceConfig {
&self.values.processing.upload
}

/// Redis servers to connect to for project configs, cardinality limits,
/// rate limiting, and metrics metadata.
pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
use crate::services::store::{StoreService, StoreServicePool};
#[cfg(feature = "processing")]
use crate::services::upload::UploadService;
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -222,12 +224,14 @@ impl ServiceState {
let store = config
.processing_enabled()
.then(|| {
let upload = services.start(UploadService::new(config.upload()));
StoreService::create(
store_pool.clone(),
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
metric_outcomes.clone(),
upload,
)
.map(|s| services.start(s))
})
Expand Down
8 changes: 5 additions & 3 deletions relay-server/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! Controller::run(|| Server::start())
//! .expect("failed to start relay");
//! ```
pub mod autoscaling;
pub mod buffer;
pub mod cogs;
pub mod global_config;
Expand All @@ -42,8 +43,9 @@ pub mod proxy_processor;
pub mod relays;
pub mod server;
pub mod stats;
pub mod upstream;

pub mod autoscaling;
#[cfg(feature = "processing")]
pub mod store;
#[cfg(feature = "processing")]
#[expect(unused)]
pub mod upload;
pub mod upstream;
5 changes: 5 additions & 0 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::services::upload::Upload;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{self, FormDataIter};

Expand Down Expand Up @@ -215,6 +216,8 @@ pub struct StoreService {
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
metric_outcomes: MetricOutcomes,
#[expect(unused)]
upload: Addr<Upload>,
producer: Producer,
}

Expand All @@ -225,6 +228,7 @@ impl StoreService {
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
metric_outcomes: MetricOutcomes,
upload: Addr<Upload>,
) -> anyhow::Result<Self> {
let producer = Producer::create(&config)?;
Ok(Self {
Expand All @@ -233,6 +237,7 @@ impl StoreService {
global_config,
outcome_aggregator,
metric_outcomes,
upload,
producer,
})
}
Expand Down
Loading