Skip to content

Commit 864e19a

Browse files
committed
chore: kafka bundle consumer / clean up
1 parent b28f946 commit 864e19a

File tree

12 files changed

+151
-99
lines changed

12 files changed

+151
-99
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/audit/src/bin/main.rs

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
55
use clap::{Parser, ValueEnum};
66
use rdkafka::consumer::Consumer;
77
use tips_audit::{
8-
KafkaMempoolArchiver, KafkaMempoolReader, S3EventReaderWriter, create_kafka_consumer,
8+
KafkaAuditLogReader, KafkaMempoolArchiver, S3EventReaderWriter, create_kafka_consumer,
99
};
10-
use tracing::{info, warn};
11-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
10+
use tips_core::logger::init_logger;
11+
use tracing::info;
1212

1313
#[derive(Debug, Clone, ValueEnum)]
1414
enum S3ConfigType {
@@ -53,28 +53,7 @@ async fn main() -> Result<()> {
5353

5454
let args = Args::parse();
5555

56-
let log_level = match args.log_level.to_lowercase().as_str() {
57-
"trace" => tracing::Level::TRACE,
58-
"debug" => tracing::Level::DEBUG,
59-
"info" => tracing::Level::INFO,
60-
"warn" => tracing::Level::WARN,
61-
"error" => tracing::Level::ERROR,
62-
_ => {
63-
warn!(
64-
"Invalid log level '{}', defaulting to 'info'",
65-
args.log_level
66-
);
67-
tracing::Level::INFO
68-
}
69-
};
70-
71-
tracing_subscriber::registry()
72-
.with(
73-
tracing_subscriber::EnvFilter::try_from_default_env()
74-
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())),
75-
)
76-
.with(tracing_subscriber::fmt::layer())
77-
.init();
56+
init_logger(&args.log_level);
7857

7958
info!(
8059
kafka_properties_file = %args.kafka_properties_file,
@@ -86,7 +65,7 @@ async fn main() -> Result<()> {
8665
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
8766
consumer.subscribe(&[&args.kafka_topic])?;
8867

89-
let reader = KafkaMempoolReader::new(consumer, args.kafka_topic.clone())?;
68+
let reader = KafkaAuditLogReader::new(consumer, args.kafka_topic.clone())?;
9069

9170
let s3_client = create_s3_client(&args).await?;
9271
let s3_bucket = args.s3_bucket.clone();

crates/audit/src/reader.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,18 @@ use rdkafka::{
77
consumer::{Consumer, StreamConsumer},
88
message::Message,
99
};
10-
use std::fs;
1110
use std::time::{Duration, SystemTime, UNIX_EPOCH};
11+
use tips_core::kafka::load_kafka_config_from_file;
1212
use tokio::time::sleep;
13-
use tracing::{debug, error, info};
13+
use tracing::{debug, error};
1414

1515
pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
16-
let client_config = load_kafka_config_from_file(kafka_properties_file)?;
16+
let client_config =
17+
ClientConfig::from_iter(load_kafka_config_from_file(kafka_properties_file)?);
1718
let consumer: StreamConsumer = client_config.create()?;
1819
Ok(consumer)
1920
}
2021

21-
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
22-
let kafka_properties = fs::read_to_string(properties_file_path)?;
23-
info!("Kafka properties:\n{}", kafka_properties);
24-
25-
let mut client_config = ClientConfig::new();
26-
27-
for line in kafka_properties.lines() {
28-
let line = line.trim();
29-
if line.is_empty() || line.starts_with('#') {
30-
continue;
31-
}
32-
if let Some((key, value)) = line.split_once('=') {
33-
client_config.set(key.trim(), value.trim());
34-
}
35-
}
36-
37-
Ok(client_config)
38-
}
39-
4022
pub fn assign_topic_partition(consumer: &StreamConsumer, topic: &str) -> Result<()> {
4123
let mut tpl = TopicPartitionList::new();
4224
tpl.add_partition(topic, 0);
@@ -57,14 +39,14 @@ pub trait EventReader {
5739
async fn commit(&mut self) -> Result<()>;
5840
}
5941

60-
pub struct KafkaMempoolReader {
42+
pub struct KafkaAuditLogReader {
6143
consumer: StreamConsumer,
6244
topic: String,
6345
last_message_offset: Option<i64>,
6446
last_message_partition: Option<i32>,
6547
}
6648

67-
impl KafkaMempoolReader {
49+
impl KafkaAuditLogReader {
6850
pub fn new(consumer: StreamConsumer, topic: String) -> Result<Self> {
6951
consumer.subscribe(&[&topic])?;
7052
Ok(Self {
@@ -77,7 +59,7 @@ impl KafkaMempoolReader {
7759
}
7860

7961
#[async_trait]
80-
impl EventReader for KafkaMempoolReader {
62+
impl EventReader for KafkaAuditLogReader {
8163
async fn read_event(&mut self) -> Result<Event> {
8264
match self.consumer.recv().await {
8365
Ok(message) => {
@@ -143,7 +125,7 @@ impl EventReader for KafkaMempoolReader {
143125
}
144126
}
145127

146-
impl KafkaMempoolReader {
128+
impl KafkaAuditLogReader {
147129
pub fn topic(&self) -> &str {
148130
&self.topic
149131
}

crates/audit/tests/integration_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22
use tips_audit::{
3-
KafkaMempoolArchiver, KafkaMempoolReader,
3+
KafkaAuditLogReader, KafkaMempoolArchiver,
44
publisher::{BundleEventPublisher, KafkaBundleEventPublisher},
55
storage::{BundleEventS3Reader, S3EventReaderWriter},
66
types::{BundleEvent, DropReason},
@@ -38,7 +38,7 @@ async fn test_kafka_publisher_s3_archiver_integration()
3838
}
3939

4040
let mut consumer = KafkaMempoolArchiver::new(
41-
KafkaMempoolReader::new(harness.kafka_consumer, topic.to_string())?,
41+
KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?,
4242
s3_writer.clone(),
4343
);
4444

crates/bundle-pool/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ tracing.workspace = true
1717
tokio.workspace = true
1818
anyhow.workspace = true
1919
async-trait.workspace = true
20+
rdkafka.workspace = true
21+
serde_json.workspace = true
2022

2123
[dev-dependencies]
2224
tips-core = { workspace = true, features = ["test-utils"] }

crates/bundle-pool/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
pub mod pool;
2+
pub mod source;
23

34
pub use pool::{BundleStore, InMemoryBundlePool};
5+
pub use source::KafkaBundleSource;
46
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};

crates/bundle-pool/src/source.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use anyhow::Result;
2+
use rdkafka::consumer::{Consumer, StreamConsumer};
3+
use rdkafka::{ClientConfig, Message};
4+
use tips_core::{Bundle, BundleWithMetadata};
5+
use tokio::sync::mpsc;
6+
use tracing::{debug, error};
7+
8+
pub struct KafkaBundleSource {
9+
queue_consumer: StreamConsumer,
10+
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
11+
}
12+
13+
impl KafkaBundleSource {
14+
pub fn new(
15+
client_config: ClientConfig,
16+
topic: String,
17+
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
18+
) -> Result<Self> {
19+
let queue_consumer: StreamConsumer = client_config.create()?;
20+
queue_consumer.subscribe(&[topic.as_str()])?;
21+
Ok(Self {
22+
queue_consumer,
23+
publisher,
24+
})
25+
}
26+
27+
pub async fn run(&self) -> Result<()> {
28+
loop {
29+
match self.queue_consumer.recv().await {
30+
Ok(message) => {
31+
let payload = match message.payload() {
32+
Some(p) => p,
33+
None => {
34+
error!("Message has no payload");
35+
continue;
36+
}
37+
};
38+
39+
let bundle: Bundle = match serde_json::from_slice(payload) {
40+
Ok(b) => b,
41+
Err(e) => {
42+
error!(error = %e, "Failed to deserialize bundle");
43+
continue;
44+
}
45+
};
46+
47+
debug!(
48+
bundle = ?bundle,
49+
offset = message.offset(),
50+
partition = message.partition(),
51+
"Received bundle from Kafka"
52+
);
53+
54+
let bundle_with_metadata = match BundleWithMetadata::load(bundle) {
55+
Ok(b) => b,
56+
Err(e) => {
57+
error!(error = %e, "Failed to load bundle");
58+
continue;
59+
}
60+
};
61+
62+
if let Err(e) = self.publisher.send(bundle_with_metadata) {
63+
error!(error = ?e, "Failed to publish bundle to queue");
64+
}
65+
}
66+
Err(e) => {
67+
error!(error = %e, "Error receiving message from Kafka");
68+
}
69+
}
70+
}
71+
}
72+
}

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ alloy-serde = { version = "1.0.41", default-features = false }
2121
alloy-signer-local = { workspace = true, optional = true }
2222
op-alloy-rpc-types = { workspace = true, optional = true }
2323
tracing.workspace = true
24+
tracing-subscriber.workspace = true
2425

2526
[dev-dependencies]
2627
alloy-signer-local.workspace = true

crates/core/src/kafka.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::collections::HashMap;
2+
use std::fs;
3+
4+
pub fn load_kafka_config_from_file(
5+
properties_file_path: &str,
6+
) -> Result<HashMap<String, String>, std::io::Error> {
7+
let kafka_properties = fs::read_to_string(properties_file_path)?;
8+
9+
let mut config = HashMap::new();
10+
11+
for line in kafka_properties.lines() {
12+
let line = line.trim();
13+
if line.is_empty() || line.starts_with('#') {
14+
continue;
15+
}
16+
if let Some((key, value)) = line.split_once('=') {
17+
config.insert(key.trim().to_string(), value.trim().to_string());
18+
}
19+
}
20+
21+
Ok(config)
22+
}

crates/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod kafka;
2+
pub mod logger;
13
pub mod types;
24

35
#[cfg(any(test, feature = "test-utils"))]

0 commit comments

Comments
 (0)