diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index c6407bd..a8b02e4 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,7 +16,7 @@ categories = ["cryptography::cryptocurrencies", "web-programming"] asset_proxy = ["strum", "cid"] credits = ["kafka_internal", "rand", "strum", "toml"] kafka = ["kafka_internal"] -kafka_internal = ["rdkafka"] +kafka_internal = ["rand", "rdkafka"] solana = ["solana-client", "solana-sdk"] metrics = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-prometheus", "prometheus"] diff --git a/crates/core/src/producer.rs b/crates/core/src/producer.rs index bc9135c..4e45ff7 100644 --- a/crates/core/src/producer.rs +++ b/crates/core/src/producer.rs @@ -1,6 +1,12 @@ //! A Kafka record producer -use std::fmt; +use std::{ + fmt, + sync::atomic::{AtomicI64, AtomicUsize, Ordering}, +}; + +use rand::Rng; +use rdkafka::producer::Producer as _; use crate::{prelude::*, util::DebugShim}; @@ -23,11 +29,27 @@ impl Config { } } +#[derive(Debug)] +struct Shared { + partition_count: AtomicUsize, + last_partition_check: AtomicI64, +} + +impl Default for Shared { + fn default() -> Self { + Self { + partition_count: 1.into(), + last_partition_check: i64::MIN.into(), + } + } +} + /// A producer for emitting messages onto the Kafka topic identified by this /// service's name #[derive(Debug, Clone)] pub struct Producer { topic: String, + shared: Arc, producer: DebugShim, msg: PhantomData, } @@ -62,6 +84,7 @@ impl Producer { Ok(Self { topic: config.topic, + shared: Shared::default().into(), producer: DebugShim(producer), msg: PhantomData::default(), }) @@ -74,13 +97,46 @@ impl Producer { payload: Option<&M>, // TODO: don't wrap this in Option key: Option<&M::Key>, ) -> Result<(), SendError> { + const INTERVAL: i64 = 5 * 60; + + let now = chrono::Local::now().timestamp(); + let parts = if self + .shared + .last_partition_check + .fetch_update(Ordering::Release, Ordering::Acquire, |t| { + (t < (now - INTERVAL)).then_some(now) + }) + .is_ok() + { + match self + .producer + .0 + .client() + .fetch_metadata(Some(&self.topic), Duration::from_secs(5)) + { + Ok(m) => { + let topic = m.topics().iter().find(|t| t.name() == self.topic).unwrap(); + let parts = topic.partitions().len(); + self.shared.partition_count.store(parts, Ordering::Release); + Some(parts) + }, + Err(e) => { + warn!("Updating the partition count failed: {e}"); + None + }, + } + } else { + None + }; + let parts = parts.unwrap_or_else(|| self.shared.partition_count.load(Ordering::Relaxed)); + match self .producer .0 .send( rdkafka::producer::FutureRecord { topic: &self.topic, - partition: None, + partition: Some(rand::thread_rng().gen_range(0..parts).try_into().unwrap_or(0)), payload: payload.map(prost::Message::encode_to_vec).as_deref(), key: key.map(prost::Message::encode_to_vec).as_deref(), timestamp: None,