diff --git a/.env.example b/.env.example index 39132ab..2cc30d6 100644 --- a/.env.example +++ b/.env.example @@ -14,7 +14,7 @@ TIPS_INGRESS_METRICS_ADDR=0.0.0.0:9002 TIPS_INGRESS_BLOCK_TIME_MILLISECONDS=2000 TIPS_INGRESS_METER_BUNDLE_TIMEOUT_MS=2000 TIPS_INGRESS_MAX_BUFFERED_METER_BUNDLE_RESPONSES=100 -TIPS_INGRESS_BUILDER_RPC=http://localhost:2222 +TIPS_INGRESS_BUILDER_RPCS=http://localhost:2222 # Audit service configuration TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 7524109..83aa8a0 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -64,11 +64,12 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); - // TODO: when we have multiple builders we can make `builder_rx` mutable and do `.subscribe()` to have multiple consumers - // of this channel. - let (builder_tx, builder_rx) = + let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); - connect_ingress_to_builder(builder_rx, config.builder_rpc); + config.builder_rpcs.iter().for_each(|builder_rpc| { + let builder_rx = builder_tx.subscribe(); + connect_ingress_to_builder(builder_rx, builder_rpc.clone()); + }); let service = IngressService::new( provider, diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index e1b0564..970ab76 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -122,9 +122,9 @@ pub struct Config { )] pub meter_bundle_timeout_ms: u64, - /// URL of the builder RPC service for setting metering information - #[arg(long, env = "TIPS_INGRESS_BUILDER_RPC")] - pub builder_rpc: Url, + /// URLs of the builder RPC service for setting metering information + #[arg(long, env = "TIPS_INGRESS_BUILDER_RPCS")] + pub builder_rpcs: Vec, /// Maximum number of `MeterBundleResponse`s to buffer in memory #[arg(