Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ crc32fast = "1.5.0"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.17.0"

[target.'cfg(target_os = "macos")'.dependencies]
mach2 = "0.4"

[dev-dependencies]
temp-env = { version = "0.3.6", features = ["async_closure"] }
insta = { version = "1.29.0", features = ["json", "redactions"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/runner-shared/src/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum MarkerType {

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum IntegrationMode {
Perf,
Walltime,
Simulation,
Analysis,
}
Expand All @@ -46,7 +46,7 @@ pub enum Command {
StopBenchmark,
Ack,
#[deprecated(note = "Use `GetIntegrationMode` instead")]
PingPerf,
PingProfiler,
SetIntegration {
name: String,
version: String,
Expand Down
8 changes: 4 additions & 4 deletions crates/runner-shared/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::module_symbols::MappedProcessModuleSymbols;
use crate::unwind_data::MappedProcessUnwindData;

#[derive(Serialize, Deserialize, Default)]
pub struct PerfMetadata {
pub struct WalltimeMetadata {
/// The version of this metadata format.
pub version: u64,

Expand Down Expand Up @@ -71,13 +71,13 @@ pub struct PerfMetadata {
pub debug_info_by_pid: HashMap<pid_t, Vec<ModuleDebugInfo>>,
}

impl PerfMetadata {
impl WalltimeMetadata {
pub fn from_reader<R: std::io::Read>(reader: R) -> anyhow::Result<Self> {
serde_json::from_reader(reader).context("Could not parse perf metadata from JSON")
serde_json::from_reader(reader).context("Could not parse walltime metadata from JSON")
}

pub fn save_to<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
let file = std::fs::File::create(path.as_ref().join("perf.metadata"))?;
let file = std::fs::File::create(path.as_ref().join("walltime.metadata"))?;
const BUFFER_SIZE: usize = 256 * 1024 /* 256 KB */;

let writer = BufWriter::with_capacity(BUFFER_SIZE, file);
Expand Down
2 changes: 1 addition & 1 deletion src/cli/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn build_orchestrator_config(
modes,
instruments: Instruments { mongodb: None }, // exec doesn't support MongoDB
perf_unwinding_mode: args.shared.perf_run_args.perf_unwinding_mode,
enable_perf: args.shared.perf_run_args.enable_perf,
enable_profiler: args.shared.perf_run_args.resolve_enable_profiler(),
simulation_tool: args.shared.simulation_tool.unwrap_or_default(),
profile_folder: args.shared.profile_folder,
skip_upload: args.shared.skip_upload,
Expand Down
5 changes: 3 additions & 2 deletions src/cli/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl RunArgs {
show_full_output: false,
base: None,
perf_run_args: PerfRunArgs {
enable_perf: false,
enable_profiler: false,
enable_perf: None,
perf_unwinding_mode: None,
},
experimental: ExperimentalArgs {
Expand Down Expand Up @@ -112,7 +113,7 @@ fn build_orchestrator_config(
modes,
instruments,
perf_unwinding_mode: args.shared.perf_run_args.perf_unwinding_mode,
enable_perf: args.shared.perf_run_args.enable_perf,
enable_profiler: args.shared.perf_run_args.resolve_enable_profiler(),
simulation_tool: args.shared.simulation_tool.unwrap_or_default(),
profile_folder: args.shared.profile_folder,
skip_upload: args.shared.skip_upload,
Expand Down
24 changes: 21 additions & 3 deletions src/cli/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,34 @@ pub enum UnwindingMode {

#[derive(Args, Debug, Clone)]
pub struct PerfRunArgs {
/// Enable the linux perf profiler to collect granular performance data.
/// Enable a profiler to collect granular performance data.
/// This is only supported on Linux.
#[arg(long, env = "CODSPEED_PERF_ENABLED", default_value_t = true)]
pub enable_perf: bool,
#[arg(long, env = "CODSPEED_PROFILER_ENABLED", default_value_t = true)]
pub enable_profiler: bool,

/// Deprecated alias for --enable-profiler / CODSPEED_PROFILER_ENABLED.
#[arg(long, env = "CODSPEED_PERF_ENABLED", hide = true)]
pub enable_perf: Option<bool>,

/// The unwinding mode that should be used with perf to collect the call stack.
#[arg(long, env = "CODSPEED_PERF_UNWINDING_MODE")]
pub perf_unwinding_mode: Option<UnwindingMode>,
}

impl PerfRunArgs {
/// Resolves the effective `enable_profiler` value, honoring the deprecated
/// `--enable-perf` / `CODSPEED_PERF_ENABLED` flag with a warning.
pub fn resolve_enable_profiler(&self) -> bool {
let Some(legacy) = self.enable_perf else {
return self.enable_profiler;
};
log::warn!(
"CODSPEED_PERF_ENABLED / --enable-perf is deprecated; use CODSPEED_PROFILER_ENABLED / --enable-profiler instead."
);
legacy
}
}

/// Parser for go-runner version that validates semver format
fn parse_version(s: &str) -> Result<semver::Version, String> {
semver::Version::parse(s).map_err(|e| format!("Invalid semantic version: {e}"))
Expand Down
8 changes: 4 additions & 4 deletions src/executor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct OrchestratorConfig {

pub modes: Vec<RunnerMode>,
pub instruments: Instruments,
pub enable_perf: bool,
pub enable_profiler: bool,
/// Stack unwinding mode for perf (if enabled)
pub perf_unwinding_mode: Option<UnwindingMode>,

Expand Down Expand Up @@ -96,7 +96,7 @@ pub struct ExecutorConfig {
pub command: String,

pub instruments: Instruments,
pub enable_perf: bool,
pub enable_profiler: bool,
/// Stack unwinding mode for perf (if enabled)
pub perf_unwinding_mode: Option<UnwindingMode>,

Expand Down Expand Up @@ -181,7 +181,7 @@ impl OrchestratorConfig {
working_directory: self.working_directory.clone(),
command,
instruments: self.instruments.clone(),
enable_perf: self.enable_perf,
enable_profiler: self.enable_profiler,
perf_unwinding_mode: self.perf_unwinding_mode,
simulation_tool: self.simulation_tool,
skip_run: self.skip_run,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl OrchestratorConfig {
modes: vec![RunnerMode::Simulation],
instruments: Instruments::test(),
perf_unwinding_mode: None,
enable_perf: false,
enable_profiler: false,
simulation_tool: SimulationTool::default(),
profile_folder: None,
skip_upload: false,
Expand Down
2 changes: 1 addition & 1 deletion src/executor/memory/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Executor for MemoryExecutor {
}

async fn run(
&self,
&mut self,
execution_context: &ExecutionContext,
_mongo_tracer: &Option<MongoTracer>,
) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub trait Executor {

/// Runs the executor
async fn run(
&self,
&mut self,
execution_context: &ExecutionContext,
// TODO: use Instruments instead of directly passing the mongodb tracer
mongo_tracer: &Option<MongoTracer>,
Expand All @@ -118,7 +118,7 @@ pub trait Executor {
/// Run a single executor: setup → run → teardown → persist logs.
/// Does NOT upload.
pub async fn run_executor(
executor: &dyn Executor,
executor: &mut dyn Executor,
orchestrator: &Orchestrator,
execution_context: &ExecutionContext,
setup_cache_dir: Option<&Path>,
Expand Down
4 changes: 2 additions & 2 deletions src/executor/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Orchestrator {
let config = self
.config
.executor_config_for_command(part.command, !part.uses_exec_harness);
let executor = get_executor_from_mode(part.mode);
let mut executor = get_executor_from_mode(part.mode);
let profile_folder =
self.resolve_profile_folder(&executor.name(), run_part_index, total_parts)?;

Expand All @@ -167,7 +167,7 @@ impl Orchestrator {
activate_rolling_buffer(&part.label);
}

run_executor(executor.as_ref(), self, &ctx, setup_cache_dir).await?;
run_executor(executor.as_mut(), self, &ctx, setup_cache_dir).await?;

if !self.config.show_full_output {
deactivate_rolling_buffer();
Expand Down
74 changes: 58 additions & 16 deletions src/executor/shared/fifo.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::prelude::*;
use anyhow::Context;
use futures::StreamExt;
use nix::{sys::time::TimeValLike, time::clock_gettime};
use runner_shared::artifacts::ExecutionTimestamps;
use runner_shared::fifo::{Command as FifoCommand, MarkerType};
use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO};
use std::cmp::Ordering;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::{collections::HashSet, time::Duration};
use tokio::io::AsyncWriteExt;
use tokio::net::unix::pid_t;
use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions;
use tokio::net::unix::pipe::Receiver as TokioPipeReader;
use tokio::net::unix::pipe::Sender as TokioPipeSender;
use tokio::time::error::Elapsed;
Expand Down Expand Up @@ -38,8 +37,8 @@ impl GenericFifo {
create_fifo(ctl_fifo)?;
create_fifo(ack_fifo)?;

let ctl_sender = get_pipe_open_options().open_sender(ctl_fifo)?;
let ack_reader = get_pipe_open_options().open_receiver(ack_fifo)?;
let ctl_sender = open_fifo_sender(ctl_fifo)?;
let ack_reader = open_fifo_receiver(ack_fifo)?;

Ok(Self {
ctl_path: ctl_fifo.to_path_buf(),
Expand Down Expand Up @@ -85,12 +84,27 @@ pub struct RunnerFifo {
ctl_reader: FramedRead<TokioPipeReader, LengthDelimitedCodec>,
}

fn get_pipe_open_options() -> TokioPipeOpenOptions {
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut options = TokioPipeOpenOptions::new();
#[cfg(target_os = "linux")]
options.read_write(true);
options
/// Open a FIFO in O_RDWR | O_NONBLOCK mode.
///
/// Tokio's `OpenOptions::read_write(true)` is Linux-only, but the underlying O_RDWR
/// trick works on every Unix: opening a FIFO read-write avoids the deadlock where
/// `open(O_WRONLY)` blocks (or returns ENXIO under O_NONBLOCK) until a reader is
/// connected, and vice versa. Since we open both ends before the peer process
/// (the integration) is even spawned, we need this on macOS too.
fn open_fifo_rdwr(path: &Path) -> anyhow::Result<std::fs::File> {
Ok(std::fs::OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)?)
}

fn open_fifo_sender(path: &Path) -> anyhow::Result<TokioPipeSender> {
Ok(TokioPipeSender::from_file(open_fifo_rdwr(path)?)?)
}

fn open_fifo_receiver(path: &Path) -> anyhow::Result<TokioPipeReader> {
Ok(TokioPipeReader::from_file(open_fifo_rdwr(path)?)?)
}

impl RunnerFifo {
Expand All @@ -102,8 +116,8 @@ impl RunnerFifo {
create_fifo(ctl_path)?;
create_fifo(ack_path)?;

let ack_fifo = get_pipe_open_options().open_sender(ack_path)?;
let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?;
let ack_fifo = open_fifo_sender(ack_path)?;
let ctl_fifo = open_fifo_receiver(ctl_path)?;

let codec = LengthDelimitedCodec::builder()
.length_field_length(4)
Expand Down Expand Up @@ -162,9 +176,37 @@ impl RunnerFifo {
let mut integration = None;

let current_time = || {
clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC)
.unwrap()
.num_nanoseconds() as u64
// Must match the clock used by instrument-hooks (`instrument_hooks_current_timestamp`)
// so timestamps from this process and the benchmarked process are comparable.
#[cfg(target_os = "macos")]
{
use mach2::mach_time;
use std::sync::OnceLock;

static NANOS_PER_TICK: OnceLock<mach_time::mach_timebase_info> = OnceLock::new();

let nanos_per_tick = NANOS_PER_TICK.get_or_init(|| unsafe {
let mut info = mach_time::mach_timebase_info::default();
let errno = mach_time::mach_timebase_info(&mut info as *mut _);
if errno != 0 || info.denom == 0 {
info.numer = 1;
info.denom = 1;
};
info
});

let time = unsafe { mach_time::mach_absolute_time() };

time * nanos_per_tick.numer as u64 / nanos_per_tick.denom as u64
}

#[cfg(not(target_os = "macos"))]
{
use nix::{sys::time::TimeValLike, time::clock_gettime};

let clock = nix::time::ClockId::CLOCK_MONOTONIC;
clock_gettime(clock).unwrap().num_nanoseconds() as u64
}
};

let mut benchmark_started = false;
Expand Down Expand Up @@ -288,7 +330,7 @@ mod tests {
let ack_path = temp_dir.path().join("ack_fifo");

let mut fifo = RunnerFifo::open(&ctl_path, &ack_path).unwrap();
let mut writer = get_pipe_open_options().open_sender(&ctl_path).unwrap();
let mut writer = open_fifo_sender(&ctl_path).unwrap();

let cmd = FifoCommand::Ack;
let payload = bincode::serialize(&cmd).unwrap();
Expand Down
Loading
Loading