Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/audit/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, info};

pub struct KafkaMempoolArchiver<R, W>
pub struct KafkaAuditArchiver<R, W>
where
R: EventReader,
W: EventWriter,
Expand All @@ -14,7 +14,7 @@ where
writer: W,
}

impl<R, W> KafkaMempoolArchiver<R, W>
impl<R, W> KafkaAuditArchiver<R, W>
where
R: EventReader,
W: EventWriter,
Expand Down
33 changes: 6 additions & 27 deletions crates/audit/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use clap::{Parser, ValueEnum};
use rdkafka::consumer::Consumer;
use tips_audit::{
KafkaMempoolArchiver, KafkaMempoolReader, S3EventReaderWriter, create_kafka_consumer,
KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer,
};
use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tips_core::logger::init_logger;
use tracing::info;

#[derive(Debug, Clone, ValueEnum)]
enum S3ConfigType {
Expand Down Expand Up @@ -53,28 +53,7 @@ async fn main() -> Result<()> {

let args = Args::parse();

let log_level = match args.log_level.to_lowercase().as_str() {
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => {
warn!(
"Invalid log level '{}', defaulting to 'info'",
args.log_level
);
tracing::Level::INFO
}
};

tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())),
)
.with(tracing_subscriber::fmt::layer())
.init();
init_logger(&args.log_level);

info!(
kafka_properties_file = %args.kafka_properties_file,
Expand All @@ -86,13 +65,13 @@ async fn main() -> Result<()> {
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
consumer.subscribe(&[&args.kafka_topic])?;

let reader = KafkaMempoolReader::new(consumer, args.kafka_topic.clone())?;
let reader = KafkaAuditLogReader::new(consumer, args.kafka_topic.clone())?;

let s3_client = create_s3_client(&args).await?;
let s3_bucket = args.s3_bucket.clone();
let writer = S3EventReaderWriter::new(s3_client, s3_bucket);

let mut archiver = KafkaMempoolArchiver::new(reader, writer);
let mut archiver = KafkaAuditArchiver::new(reader, writer);

info!("Audit archiver initialized, starting main loop");

Expand Down
17 changes: 17 additions & 0 deletions crates/audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,25 @@ pub mod reader;
pub mod storage;
pub mod types;

use tokio::sync::mpsc;
use tracing::error;

pub use archiver::*;
pub use publisher::*;
pub use reader::*;
pub use storage::*;
pub use types::*;

pub fn connect_audit_to_publisher<P>(event_rx: mpsc::UnboundedReceiver<BundleEvent>, publisher: P)
where
P: BundleEventPublisher + 'static,
{
tokio::spawn(async move {
let mut event_rx = event_rx;
while let Some(event) = event_rx.recv().await {
if let Err(e) = publisher.publish(event).await {
error!(error = %e, "Failed to publish bundle event");
}
}
});
}
34 changes: 8 additions & 26 deletions crates/audit/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,18 @@ use rdkafka::{
consumer::{Consumer, StreamConsumer},
message::Message,
};
use std::fs;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tips_core::kafka::load_kafka_config_from_file;
use tokio::time::sleep;
use tracing::{debug, error, info};
use tracing::{debug, error};

pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
let client_config = load_kafka_config_from_file(kafka_properties_file)?;
let client_config =
ClientConfig::from_iter(load_kafka_config_from_file(kafka_properties_file)?);
let consumer: StreamConsumer = client_config.create()?;
Ok(consumer)
}

fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
let kafka_properties = fs::read_to_string(properties_file_path)?;
info!("Kafka properties:\n{}", kafka_properties);

let mut client_config = ClientConfig::new();

for line in kafka_properties.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, value)) = line.split_once('=') {
client_config.set(key.trim(), value.trim());
}
}

Ok(client_config)
}

pub fn assign_topic_partition(consumer: &StreamConsumer, topic: &str) -> Result<()> {
let mut tpl = TopicPartitionList::new();
tpl.add_partition(topic, 0);
Expand All @@ -57,14 +39,14 @@ pub trait EventReader {
async fn commit(&mut self) -> Result<()>;
}

pub struct KafkaMempoolReader {
pub struct KafkaAuditLogReader {
consumer: StreamConsumer,
topic: String,
last_message_offset: Option<i64>,
last_message_partition: Option<i32>,
}

impl KafkaMempoolReader {
impl KafkaAuditLogReader {
pub fn new(consumer: StreamConsumer, topic: String) -> Result<Self> {
consumer.subscribe(&[&topic])?;
Ok(Self {
Expand All @@ -77,7 +59,7 @@ impl KafkaMempoolReader {
}

#[async_trait]
impl EventReader for KafkaMempoolReader {
impl EventReader for KafkaAuditLogReader {
async fn read_event(&mut self) -> Result<Event> {
match self.consumer.recv().await {
Ok(message) => {
Expand Down Expand Up @@ -143,7 +125,7 @@ impl EventReader for KafkaMempoolReader {
}
}

impl KafkaMempoolReader {
impl KafkaAuditLogReader {
pub fn topic(&self) -> &str {
&self.topic
}
Expand Down
6 changes: 3 additions & 3 deletions crates/audit/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;
use tips_audit::{
KafkaMempoolArchiver, KafkaMempoolReader,
KafkaAuditArchiver, KafkaAuditLogReader,
publisher::{BundleEventPublisher, KafkaBundleEventPublisher},
storage::{BundleEventS3Reader, S3EventReaderWriter},
types::{BundleEvent, DropReason},
Expand Down Expand Up @@ -37,8 +37,8 @@ async fn test_kafka_publisher_s3_archiver_integration()
publisher.publish(event.clone()).await?;
}

let mut consumer = KafkaMempoolArchiver::new(
KafkaMempoolReader::new(harness.kafka_consumer, topic.to_string())?,
let mut consumer = KafkaAuditArchiver::new(
KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?,
s3_writer.clone(),
);

Expand Down
5 changes: 5 additions & 0 deletions crates/bundle-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ tracing.workspace = true
tokio.workspace = true
anyhow.workspace = true
async-trait.workspace = true
rdkafka.workspace = true
serde_json.workspace = true

[dev-dependencies]
tips-core = { workspace = true, features = ["test-utils"] }
Expand All @@ -26,3 +28,6 @@ alloy-signer = "1.0.41"
alloy-signer-local = "1.0.41"
op-alloy-consensus.workspace = true
op-alloy-rpc-types.workspace = true
testcontainers.workspace = true
testcontainers-modules.workspace = true
serde.workspace = true
31 changes: 31 additions & 0 deletions crates/bundle-pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
pub mod pool;
pub mod source;

use source::BundleSource;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tracing::error;

pub use pool::{BundleStore, InMemoryBundlePool};
pub use source::KafkaBundleSource;
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};

pub fn connect_sources_to_pool<S, P>(
sources: Vec<S>,
bundle_rx: mpsc::UnboundedReceiver<BundleWithMetadata>,
pool: Arc<Mutex<P>>,
) where
S: BundleSource + Send + 'static,
P: BundleStore + Send + 'static,
{
for source in sources {
tokio::spawn(async move {
if let Err(e) = source.run().await {
error!(error = %e, "Bundle source failed");
}
});
}

tokio::spawn(async move {
let mut bundle_rx = bundle_rx;
while let Some(bundle) = bundle_rx.recv().await {
pool.lock().unwrap().add_bundle(bundle);
}
});
}
81 changes: 81 additions & 0 deletions crates/bundle-pool/src/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use anyhow::Result;
use async_trait::async_trait;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};
use tips_core::{Bundle, BundleWithMetadata};
use tokio::sync::mpsc;
use tracing::{debug, error};

#[async_trait]
pub trait BundleSource {
async fn run(&self) -> Result<()>;
}

pub struct KafkaBundleSource {
queue_consumer: StreamConsumer,
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
}

impl KafkaBundleSource {
pub fn new(
client_config: ClientConfig,
topic: String,
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
) -> Result<Self> {
let queue_consumer: StreamConsumer = client_config.create()?;
queue_consumer.subscribe(&[topic.as_str()])?;
Ok(Self {
queue_consumer,
publisher,
})
}
}

#[async_trait]
impl BundleSource for KafkaBundleSource {
async fn run(&self) -> Result<()> {
loop {
match self.queue_consumer.recv().await {
Ok(message) => {
let payload = match message.payload() {
Some(p) => p,
None => {
error!("Message has no payload");
continue;
}
};

let bundle: Bundle = match serde_json::from_slice(payload) {
Ok(b) => b,
Err(e) => {
error!(error = %e, "Failed to deserialize bundle");
continue;
}
};

debug!(
bundle = ?bundle,
offset = message.offset(),
partition = message.partition(),
"Received bundle from Kafka"
);

let bundle_with_metadata = match BundleWithMetadata::load(bundle) {
Ok(b) => b,
Err(e) => {
error!(error = %e, "Failed to load bundle");
continue;
}
};

if let Err(e) = self.publisher.send(bundle_with_metadata) {
error!(error = ?e, "Failed to publish bundle to queue");
}
}
Err(e) => {
error!(error = %e, "Error receiving message from Kafka");
}
}
}
}
}
Loading