Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

> Note: this file did not exist until after `v0.5.6`.

## Unreleased

- (rpc): fix CPU usage bugs ([#527](https://github.com/flashbots/contender/pull/527/changes))

## [0.10.0](https://github.com/flashbots/contender/releases/tag/v0.10.0) - 2026-04-20

- (rpc): accept eth denominations (e.g. "1 eth") for min_balance ([#518](https://github.com/flashbots/contender/pull/518))
Expand Down
27 changes: 20 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ contender_engine_provider = { path = "crates/engine_provider/" }
contender_report = { path = "crates/report/" }

tokio = { version = "1.40.0" }
tokio-metrics = { version = "0.5.0" }
tokio-tungstenite = { version = "0.26", features = ["native-tls"] }
tokio-util = "0.7"
alloy = { version = "1.0.22" }
Expand Down
4 changes: 4 additions & 0 deletions crates/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

- (rpc): fix CPU usage bugs ([#527](https://github.com/flashbots/contender/pull/527/changes))

## [0.10.0](https://github.com/flashbots/contender/releases/tag/v0.10.0) - 2026-04-20

- (rpc): accept eth denominations (e.g. "1 eth") for min_balance ([#518](https://github.com/flashbots/contender/pull/518/changes))
Expand Down
4 changes: 3 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ contender_report = { workspace = true }
nu-ansi-term = { workspace = true }
console-subscriber = { workspace = true, optional = true }
serde = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "tracing"] }
tokio = { workspace = true, features = ["rt-multi-thread", "tracing", "rt"] }
tokio-metrics = { workspace = true, optional = true }
tokio-util = { workspace = true }
alloy = { workspace = true, features = [
"full",
Expand Down Expand Up @@ -61,6 +62,7 @@ tempfile = "3.15.0"
[features]
default = []
async-tracing = ["dep:console-subscriber"]
tokio-metrics = ["dep:tokio-metrics"]

[build-dependencies]
syn = { version = "2", features = ["full"] }
Expand Down
5 changes: 4 additions & 1 deletion crates/cli/src/commands/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crate::{
util::{check_private_keys, find_insufficient_balances, fund_accounts, load_seedfile},
LATENCY_HIST as HIST, PROM,
};
use contender_core::util::get_block_time;
use contender_core::{
generator::{
agent_pools::{AgentPools, AgentSpec},
RandSeed,
},
test_scenario::{TestScenario, TestScenarioParams},
};
use contender_core::{util::get_block_time, CancellationToken};
use contender_engine_provider::DEFAULT_BLOCK_TIME;
use contender_testfile::TestConfig;
use std::{
Expand Down Expand Up @@ -99,6 +99,7 @@ pub async fn setup(
.await?;
}

let cancel_token = CancellationToken::new();
let params = TestScenarioParams {
rpc_url: args.eth_json_rpc_args.rpc_url,
builder_rpc_url: None,
Expand All @@ -123,6 +124,7 @@ pub async fn setup(
params,
engine_params.engine_provider,
(&PROM, &HIST).into(),
&cancel_token,
)
.await?;

Expand Down Expand Up @@ -199,6 +201,7 @@ pub async fn setup(

_ = tokio::signal::ctrl_c() => {
warn!("Setup cancelled.");
cancel_token.cancel();
is_done.store(true, Ordering::SeqCst);
},
}
Expand Down
11 changes: 11 additions & 0 deletions crates/cli/src/commands/spam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl SpamCommandArgs {
None
};

let cancel_token = CancellationToken::new();
let params = TestScenarioParams {
rpc_url: self.spam_args.eth_json_rpc_args.rpc_args.rpc_url.clone(),
builder_rpc_url: builder_url.to_owned(),
Expand Down Expand Up @@ -559,6 +560,7 @@ impl SpamCommandArgs {
params,
engine_params.engine_provider.clone(),
(&PROM, &HIST).into(),
&cancel_token,
)
.await?;

Expand All @@ -573,6 +575,7 @@ impl SpamCommandArgs {
}
.into());
}

tokio::select! {
inner_res = async move {
if let Some(handle) = fcu_handle {
Expand All @@ -592,6 +595,14 @@ impl SpamCommandArgs {
} => {
inner_res
}
_ = cancel_token.cancelled() => {
warn!("Setup cancelled.");
return Err(
CliError::Core(
RuntimeErrorKind::InitializationCancelled.into()
)
);
}
}?;
}
done_fcu.store(true, std::sync::atomic::Ordering::SeqCst);
Expand Down
21 changes: 21 additions & 0 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,30 @@ use tracing_subscriber::EnvFilter;

#[tokio::main(flavor = "multi_thread")]
async fn main() -> miette::Result<()> {
init_tokio_metrics();
run().await.map_err(|e| e.into())
}

/// Initializes tokio metrics collection/logging thread.
/// No-op if the "tokio-metrics" feature is disabled.
fn init_tokio_metrics() {
#[cfg(feature = "tokio-metrics")]
{
use std::time::Duration;
use tokio_metrics::RuntimeMonitor;
let handle = tokio::runtime::Handle::current();
let monitor = RuntimeMonitor::new(&handle);
tokio::spawn(async move {
for interval in monitor.intervals() {
// pretty-print the metric interval
println!("{:#?}", interval);
// wait
tokio::time::sleep(Duration::from_millis(1000)).await;
}
});
}
}

async fn run() -> Result<(), contender_cli::Error> {
let args = ContenderCli::parse_args();

Expand Down
31 changes: 15 additions & 16 deletions crates/cli/src/server/rpc_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,14 @@ impl ContenderRpcServer for ContenderServer {
params: AddSessionParams,
) -> jsonrpsee::core::RpcResult<ContenderSessionInfo> {
let session_seed;
let info;
{
let info = {
let mut sessions = self.sessions.write().await;
session_seed = RandSeed::seed_from_bytes(&sessions.num_sessions().to_be_bytes());
let session = sessions
.add_session(params.to_new_session_params(session_seed).await?)
.await?;
info = session.info.clone();
}
session.info.clone()
};

let session_id = info.id;
let sessions = Arc::clone(&self.sessions);
Expand All @@ -108,7 +107,6 @@ impl ContenderRpcServer for ContenderServer {
let mut lock = sessions.write().await;
lock.take_uninitialized(session_id)
};

let Some(contender) = contender else {
return;
};
Expand All @@ -123,7 +121,7 @@ impl ContenderRpcServer for ContenderServer {
}
}
Err(e) => {
let msg = e.to_string();
let msg = format!("{e:?}");
let mut lock = sessions.write().await;
if let Some(session) = lock.get_session_mut(session_id) {
session.info.status = SessionStatus::Failed(msg.clone());
Expand Down Expand Up @@ -158,7 +156,7 @@ impl ContenderRpcServer for ContenderServer {

async fn remove_session(&self, id: usize) -> jsonrpsee::core::RpcResult<()> {
let mut sessions = self.sessions.write().await;
sessions.remove_session(id);
sessions.remove_session(id).await;
Ok(())
}

Expand Down Expand Up @@ -401,15 +399,16 @@ impl ContenderRpcServer for ContenderServer {

async fn stop(&self, session_id: usize) -> jsonrpsee::core::RpcResult<String> {
let span = tracing::info_span!("session_stop", id = session_id);
let sessions = self.sessions.read().await;
let Some(session) = sessions.get_session(session_id) else {
return Err(ContenderRpcError::SessionNotFound(session_id).into());
};
let Some(ref token) = session.spam_cancel else {
return Err(ContenderRpcError::SessionNotBusy(session_id).into());
};
token.cancel();
drop(sessions);
{
let sessions = self.sessions.read().await;
let Some(session) = sessions.get_session(session_id) else {
return Err(ContenderRpcError::SessionNotFound(session_id).into());
};
let Some(ref token) = session.spam_cancel else {
return Err(ContenderRpcError::SessionNotBusy(session_id).into());
};
token.cancel();
}
{
let _enter = span.enter();
info!("Sent stop signal to session {session_id}");
Expand Down
28 changes: 25 additions & 3 deletions crates/cli/src/server/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ impl ContenderSession {
.create_contender(params.test_config, params.options)
.await?;
let (log_channel, _) = broadcast::channel(4096);
let cancel = contender.cancel_token();
Ok(Self {
info,
contender: Some(SessionContender::Uninit(contender)),
log_channel,
cancel: CancellationToken::new(),
cancel,
spam_cancel: None,
funder: None,
agent_store: None,
Expand Down Expand Up @@ -294,12 +295,33 @@ impl ContenderSessionCache {
}
}

pub fn remove_session(&mut self, id: SessionId) {
if let Some(session) = self.get_session(id) {
pub async fn remove_session(&mut self, id: SessionId) {
// If the session exists and has an initialized Contender, shut it down first.
if let Some(session) = self.get_session_mut(id) {
// Stop any running spam before tearing down.
if let Some(ref token) = session.spam_cancel {
token.cancel();
}

// If the session has an initialized Contender, take it and shut it down.
let maybe_contender = match session.contender.take() {
Some(SessionContender::Init(c)) => Some(c),
Some(SessionContender::Uninit(c)) => {
c.cancel();
None
}
other => {
session.contender = other;
None
}
};
if let Some(mut contender) = maybe_contender {
// Call shutdown on the scenario to stop all background actors.
// This is async, so we must await it.
let scenario = contender.scenario_mut();
scenario.shutdown().await;
}

// Cancel subscriber streams before dropping the session.
session.cancel.cancel();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/server/static/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"BundleCallDefinition": {"file":"crates/core/src/generator/function_def.rs","line":53},
"BundleTypeCli": {"file":"crates/cli/src/commands/common.rs","line":426},
"CompiledContract": {"file":"crates/core/src/generator/create_def.rs","line":8},
"ContenderSessionInfo": {"file":"crates/cli/src/server/sessions.rs","line":127},
"ContenderSessionInfo": {"file":"crates/cli/src/server/sessions.rs","line":128},
"CreateDefinition": {"file":"crates/core/src/generator/create_def.rs","line":65},
"CustomContractCliArgs": {"file":"crates/cli/src/default_scenarios/custom_contract.rs","line":17},
"EngineMessageVersion": {"file":"crates/cli/src/commands/common.rs","line":268},
Expand Down
10 changes: 10 additions & 0 deletions crates/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

- (rpc): fix CPU usage bugs ([#527](https://github.com/flashbots/contender/pull/527/changes))

### Breaking changes

*from [#527](https://github.com/flashbots/contender/pull/527/changes):*

- `TestScenario::new` added a param: `cancel_token: &CancellationToken`

## [0.10.0](https://github.com/flashbots/contender/releases/tag/v0.10.0) - 2026-04-20

- refactor `utils::{parse_value, parse_value_opt}` to accept numbers or strings for deser ([#518](https://github.com/flashbots/contender/pull/518/changes))
Expand Down
Loading
Loading