Skip to content

Commit

Permalink
Try to submit messages to any available partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
ray-kast committed Oct 23, 2023
1 parent 8083d1a commit b61a335
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ categories = ["cryptography::cryptocurrencies", "web-programming"]
asset_proxy = ["strum", "cid"]
credits = ["kafka_internal", "rand", "strum", "toml"]
kafka = ["kafka_internal"]
kafka_internal = ["rdkafka"]
kafka_internal = ["rand", "rdkafka"]
solana = ["solana-client", "solana-sdk"]
metrics = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-prometheus", "prometheus"]

Expand Down
60 changes: 58 additions & 2 deletions crates/core/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! A Kafka record producer

use std::fmt;
use std::{
fmt,
sync::atomic::{AtomicI64, AtomicUsize, Ordering},
};

use rand::Rng;
use rdkafka::producer::Producer as _;

use crate::{prelude::*, util::DebugShim};

Expand All @@ -23,11 +29,27 @@ impl Config {
}
}

#[derive(Debug)]
struct Shared {
partition_count: AtomicUsize,
last_partition_check: AtomicI64,
}

impl Default for Shared {
fn default() -> Self {
Self {
partition_count: 1.into(),
last_partition_check: i64::MIN.into(),
}
}
}

/// A producer for emitting messages onto the Kafka topic identified by this
/// service's name
#[derive(Debug, Clone)]
pub struct Producer<M> {
topic: String,
shared: Arc<Shared>,
producer: DebugShim<rdkafka::producer::FutureProducer>,
msg: PhantomData<fn(&M)>,
}
Expand Down Expand Up @@ -62,6 +84,7 @@ impl<M: Message> Producer<M> {

Ok(Self {
topic: config.topic,
shared: Shared::default().into(),
producer: DebugShim(producer),
msg: PhantomData::default(),
})
Expand All @@ -74,13 +97,46 @@ impl<M: Message> Producer<M> {
payload: Option<&M>, // TODO: don't wrap this in Option
key: Option<&M::Key>,
) -> Result<(), SendError> {
const INTERVAL: i64 = 5 * 60;

let now = chrono::Local::now().timestamp();
let parts = if self
.shared
.last_partition_check
.fetch_update(Ordering::Release, Ordering::Acquire, |t| {
(t < (now - INTERVAL)).then_some(now)
})
.is_ok()
{
match self
.producer
.0
.client()
.fetch_metadata(Some(&self.topic), Duration::from_secs(5))
{
Ok(m) => {
let topic = m.topics().iter().find(|t| t.name() == self.topic).unwrap();
let parts = topic.partitions().len();
self.shared.partition_count.store(parts, Ordering::Release);
Some(parts)
},
Err(e) => {
warn!("Updating the partition count failed: {e}");
None
},
}
} else {
None
};
let parts = parts.unwrap_or_else(|| self.shared.partition_count.load(Ordering::Relaxed));

match self
.producer
.0
.send(
rdkafka::producer::FutureRecord {
topic: &self.topic,
partition: None,
partition: Some(rand::thread_rng().gen_range(0..parts).try_into().unwrap_or(0)),
payload: payload.map(prost::Message::encode_to_vec).as_deref(),
key: key.map(prost::Message::encode_to_vec).as_deref(),
timestamp: None,
Expand Down

0 comments on commit b61a335

Please sign in to comment.