Skip to content
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ TIPS_INGRESS_BLOCK_TIME_MILLISECONDS=2000
TIPS_INGRESS_METER_BUNDLE_TIMEOUT_MS=2000
TIPS_INGRESS_MAX_BUFFERED_METER_BUNDLE_RESPONSES=100
TIPS_INGRESS_BUILDER_RPCS=http://localhost:2222,http://localhost:2222,http://localhost:2222
TIPS_INGRESS_BACKRUN_ENABLED=true

# Audit service configuration
TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties
Expand Down
9 changes: 6 additions & 3 deletions crates/ingress-rpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use op_alloy_network::Optimism;
use rdkafka::ClientConfig;
use rdkafka::producer::FutureProducer;
use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher};
use tips_core::MeterBundleResponse;
use tips_core::kafka::load_kafka_config_from_file;
use tips_core::logger::init_logger_with_format;
use tips_core::{Bundle, MeterBundleResponse};
use tips_ingress_rpc::Config;
use tips_ingress_rpc::connect_ingress_to_builder;
use tips_ingress_rpc::health::bind_health_server;
Expand Down Expand Up @@ -68,9 +68,11 @@ async fn main() -> anyhow::Result<()> {

let (builder_tx, _) =
broadcast::channel::<MeterBundleResponse>(config.max_buffered_meter_bundle_responses);
let (builder_backrun_tx, _) = broadcast::channel::<Bundle>(config.max_buffered_backrun_bundles);
config.builder_rpcs.iter().for_each(|builder_rpc| {
let builder_rx = builder_tx.subscribe();
connect_ingress_to_builder(builder_rx, builder_rpc.clone());
let metering_rx = builder_tx.subscribe();
let backrun_rx = builder_backrun_tx.subscribe();
connect_ingress_to_builder(metering_rx, backrun_rx, builder_rpc.clone());
});

let health_check_addr = config.health_check_addr;
Expand All @@ -86,6 +88,7 @@ async fn main() -> anyhow::Result<()> {
queue,
audit_tx,
builder_tx,
builder_backrun_tx,
cfg,
);
let bind_addr = format!("{}:{}", config.address, config.port);
Expand Down
43 changes: 35 additions & 8 deletions crates/ingress-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,44 @@ pub struct Config {
)]
pub max_buffered_meter_bundle_responses: usize,

/// Maximum number of backrun bundles to buffer in memory
#[arg(
long,
env = "TIPS_INGRESS_MAX_BUFFERED_BACKRUN_BUNDLES",
default_value = "100"
)]
pub max_buffered_backrun_bundles: usize,

/// Address to bind the health check server to
#[arg(
long,
env = "TIPS_INGRESS_HEALTH_CHECK_ADDR",
default_value = "0.0.0.0:8081"
)]
pub health_check_addr: SocketAddr,

/// Enable backrun bundle submission to op-rbuilder
#[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")]
pub backrun_enabled: bool,
}

pub fn connect_ingress_to_builder(
event_rx: broadcast::Receiver<MeterBundleResponse>,
metering_rx: broadcast::Receiver<MeterBundleResponse>,
backrun_rx: broadcast::Receiver<tips_core::Bundle>,
builder_rpc: Url,
) {
tokio::spawn(async move {
let builder: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(builder_rpc);
let builder: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(builder_rpc);

let mut event_rx = event_rx;
let metering_builder = builder.clone();
tokio::spawn(async move {
let mut event_rx = metering_rx;
while let Ok(event) = event_rx.recv().await {
// we only support one transaction per bundle for now
let tx_hash = event.results[0].tx_hash;
if let Err(e) = builder
if let Err(e) = metering_builder
.client()
.request::<(TxHash, MeterBundleResponse), ()>(
"base_setMeteringInformation",
Expand All @@ -173,4 +187,17 @@ pub fn connect_ingress_to_builder(
}
}
});

tokio::spawn(async move {
let mut event_rx = backrun_rx;
while let Ok(bundle) = event_rx.recv().await {
if let Err(e) = builder
.client()
.request::<(tips_core::Bundle,), ()>("base_sendBackrunBundle", (bundle,))
.await
{
error!(error = %e, "Failed to send backrun bundle to builder");
}
}
});
}
8 changes: 7 additions & 1 deletion crates/ingress-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::Histogram;
use metrics::{Counter, Histogram};
use metrics_derive::Metrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::net::SocketAddr;
Expand Down Expand Up @@ -29,6 +29,12 @@ pub struct Metrics {

#[metric(describe = "Duration of send_raw_transaction")]
pub send_raw_transaction_duration: Histogram,

#[metric(describe = "Total backrun bundles received")]
pub backrun_bundles_received_total: Counter,

#[metric(describe = "Duration to send backrun bundle to op-rbuilder")]
pub backrun_bundles_sent_duration: Histogram,
}

/// Initialize Prometheus metrics exporter
Expand Down
111 changes: 81 additions & 30 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub trait IngressApi {
#[method(name = "sendBundle")]
async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash>;

#[method(name = "sendBackrunBundle")]
async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash>;

/// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain.
#[method(name = "cancelBundle")]
async fn cancel_bundle(&self, request: CancelBundle) -> RpcResult<()>;
Expand Down Expand Up @@ -58,6 +61,8 @@ pub struct IngressService<Queue> {
block_time_milliseconds: u64,
meter_bundle_timeout_ms: u64,
builder_tx: broadcast::Sender<MeterBundleResponse>,
backrun_enabled: bool,
builder_backrun_tx: broadcast::Sender<Bundle>,
}

impl<Queue> IngressService<Queue> {
Expand All @@ -67,6 +72,7 @@ impl<Queue> IngressService<Queue> {
queue: Queue,
audit_channel: mpsc::UnboundedSender<BundleEvent>,
builder_tx: broadcast::Sender<MeterBundleResponse>,
builder_backrun_tx: broadcast::Sender<Bundle>,
config: Config,
) -> Self {
Self {
Expand All @@ -81,6 +87,8 @@ impl<Queue> IngressService<Queue> {
block_time_milliseconds: config.block_time_milliseconds,
meter_bundle_timeout_ms: config.meter_bundle_timeout_ms,
builder_tx,
backrun_enabled: config.backrun_enabled,
builder_backrun_tx,
}
}
}
Expand All @@ -90,16 +98,45 @@ impl<Queue> IngressApiServer for IngressService<Queue>
where
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
if !self.backrun_enabled {
info!(
message = "Backrun bundle submission is disabled",
backrun_enabled = self.backrun_enabled
);
return Err(
EthApiError::InvalidParams("Backrun bundle submission is disabled".into())
.into_rpc_err(),
);
}

let start = Instant::now();
let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?;

self.metrics.backrun_bundles_received_total.increment(1);

if let Err(e) = self.builder_backrun_tx.send(bundle) {
warn!(
message = "Failed to send backrun bundle to builders",
bundle_hash = %bundle_hash,
error = %e
);
}

self.send_audit_event(&accepted_bundle, bundle_hash);

self.metrics
.backrun_bundles_sent_duration
.record(start.elapsed().as_secs_f64());

Ok(BundleHash { bundle_hash })
}

async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
// validate the bundle and consume the `bundle` to get an `AcceptedBundle`
self.validate_bundle(&bundle).await?;
let parsed_bundle: ParsedBundle = bundle
.clone()
.try_into()
.map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?;
let bundle_hash = &parsed_bundle.bundle_hash();
let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?;
let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone());
let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?;

// Get meter_bundle_response for builder broadcast
let meter_bundle_response = accepted_bundle.meter_bundle_response.clone();

// asynchronously send the meter bundle response to the builder
self.builder_tx
Expand All @@ -109,7 +146,7 @@ where
// publish the bundle to the queue
if let Err(e) = self
.bundle_queue
.publish(&accepted_bundle, bundle_hash)
.publish(&accepted_bundle, &bundle_hash)
.await
{
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
Expand All @@ -122,20 +159,9 @@ where
);

// asynchronously send the audit event to the audit channel
let audit_event = BundleEvent::Received {
bundle_id: *accepted_bundle.uuid(),
bundle: Box::new(accepted_bundle.clone()),
};
if let Err(e) = self.audit_channel.send(audit_event) {
warn!(message = "Failed to send audit event", error = %e);
return Err(
EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(),
);
}
self.send_audit_event(&accepted_bundle, bundle_hash);

Ok(BundleHash {
bundle_hash: *bundle_hash,
})
Ok(BundleHash { bundle_hash })
}

async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> {
Expand Down Expand Up @@ -212,13 +238,7 @@ where
}
}

let audit_event = BundleEvent::Received {
bundle_id: *accepted_bundle.uuid(),
bundle: accepted_bundle.clone().into(),
};
if let Err(e) = self.audit_channel.send(audit_event) {
warn!(message = "Failed to send audit event", error = %e);
}
self.send_audit_event(&accepted_bundle, transaction.tx_hash());

self.metrics
.send_raw_transaction_duration
Expand Down Expand Up @@ -327,6 +347,37 @@ where
}
Ok(res)
}

/// Helper method to validate, parse, and meter a bundle
async fn validate_parse_and_meter_bundle(
&self,
bundle: &Bundle,
) -> RpcResult<(AcceptedBundle, B256)> {
self.validate_bundle(bundle).await?;
let parsed_bundle: ParsedBundle = bundle
.clone()
.try_into()
.map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?;
let bundle_hash = parsed_bundle.bundle_hash();
let meter_bundle_response = self.meter_bundle(bundle, &bundle_hash).await?;
let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone());
Ok((accepted_bundle, bundle_hash))
}

/// Helper method to send audit event for a bundle
fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) {
let audit_event = BundleEvent::Received {
bundle_id: *accepted_bundle.uuid(),
bundle: Box::new(accepted_bundle.clone()),
};
if let Err(e) = self.audit_channel.send(audit_event) {
warn!(
message = "Failed to send audit event",
bundle_hash = %bundle_hash,
error = %e
);
}
}
}

#[cfg(test)]
Expand Down
67 changes: 67 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ get-blocks:
sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"

backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"
backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a"

send-txn:
#!/usr/bin/env bash
set -euxo pipefail
Expand All @@ -108,3 +111,67 @@ send-txn:
hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result")
cast receipt $hash -r {{ sequencer_url }} | grep status
cast receipt $hash -r {{ builder_url }} | grep status

send-txn-with-backrun:
#!/usr/bin/env bash
set -euxo pipefail

# 1. Get nonce and send target transaction from sender account
nonce=$(cast nonce {{ sender }} -r {{ builder_url }})
echo "Sending target transaction from sender (nonce=$nonce)..."
target_txn=$(cast mktx --private-key {{ sender_key }} \
0x0000000000000000000000000000000000000000 \
--value 0.01ether \
--nonce $nonce \
--chain-id 13 \
-r {{ builder_url }})

target_hash=$(curl -s {{ ingress_url }} -X POST \
-H "Content-Type: application/json" \
--data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \
| jq -r ".result")
echo "Target tx sent: $target_hash"

# 2. Build backrun transaction from backrunner account (different account!)
backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }})
echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..."
backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \
0x0000000000000000000000000000000000000001 \
--value 0.001ether \
--nonce $backrun_nonce \
--chain-id 13 \
-r {{ builder_url }})

# 3. Compute tx hashes for reverting_tx_hashes
backrun_hash_computed=$(cast keccak $backrun_txn)
echo "Target tx hash: $target_hash"
echo "Backrun tx hash: $backrun_hash_computed"

# 4. Construct and send bundle with reverting_tx_hashes
echo "Sending backrun bundle..."
bundle_json=$(jq -n \
--arg target "$target_txn" \
--arg backrun "$backrun_txn" \
--arg target_hash "$target_hash" \
--arg backrun_hash "$backrun_hash_computed" \
'{
txs: [$target, $backrun],
blockNumber: 0,
revertingTxHashes: [$target_hash, $backrun_hash]
}')

bundle_hash=$(curl -s {{ ingress_url }} -X POST \
-H "Content-Type: application/json" \
--data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \
| jq -r ".result")
echo "Bundle sent: $bundle_hash"

# 5. Wait and verify both transactions
echo "Waiting for transactions to land..."
sleep 5

echo "=== Target transaction (from sender) ==="
cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)"

echo "=== Backrun transaction (from backrunner) ==="
cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet"