Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disabling HyperThreading: Worker start option --cpus="no-ht" #260

Merged
merged 3 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
```
Using this command you can quickly test if PBS/Slurm will accept allocations created with
the provided parameters.
* Worker can be started with --cpus="no-ht" that detects CPUs but ignores HyperThreading
(for each physical core it ignores all except the first HT virtual core)
* You can now specify the timelimit of PBS/Slurm allocations using the `HH:MM:SS` format:
`hq alloc add pbs --time-limit 01:10:30`.
* Improve error messages printed when an invalid CLI parameter is entered.
Expand Down
24 changes: 18 additions & 6 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use hyperqueue::server::bootstrap::{
use hyperqueue::transfer::messages::{
FromClientMessage, JobInfoRequest, Selector, ToClientMessage,
};
use hyperqueue::worker::hwdetect::{detect_cpus, detect_generic_resource};
use hyperqueue::worker::hwdetect::{detect_cpus, detect_cpus_no_ht, detect_generic_resource};
use hyperqueue::worker::start::{start_hq_worker, WorkerStartOpts};
use hyperqueue::WorkerId;
use tako::common::resources::ResourceDescriptor;
Expand Down Expand Up @@ -224,6 +224,13 @@ struct WorkerInfoOpts {
worker_id: WorkerId,
}

#[derive(Parser)]
struct HwDetectOpts {
/// Detect only physical cores
#[clap(long)]
spirali marked this conversation as resolved.
Show resolved Hide resolved
no_hyperthreading: bool,
Kobzol marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Parser)]
enum WorkerCommand {
/// Start worker
Expand All @@ -233,7 +240,8 @@ enum WorkerCommand {
/// Display information about workers
List(WorkerListOpts),
/// Hwdetect
Hwdetect,
#[clap(name = "hwdetect")]
HwDetect(HwDetectOpts),
/// Display information about a specific worker
Info(WorkerInfoOpts),
/// Display worker's hostname
Expand Down Expand Up @@ -384,8 +392,12 @@ async fn command_resubmit(gsettings: GlobalSettings, opts: ResubmitOpts) -> anyh
resubmit_computation(&gsettings, &mut connection, opts).await
}

fn command_worker_hwdetect(gsettings: GlobalSettings) -> anyhow::Result<()> {
let cpus = detect_cpus()?;
fn command_worker_hwdetect(gsettings: GlobalSettings, opts: HwDetectOpts) -> anyhow::Result<()> {
let cpus = if opts.no_hyperthreading {
detect_cpus_no_ht()?
} else {
detect_cpus()?
};
let generic = detect_generic_resource()?;
gsettings
.printer()
Expand Down Expand Up @@ -532,8 +544,8 @@ async fn main() -> hyperqueue::Result<()> {
subcmd: WorkerCommand::Info(opts),
}) => command_worker_info(gsettings, opts).await,
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Hwdetect,
}) => command_worker_hwdetect(gsettings),
subcmd: WorkerCommand::HwDetect(opts),
}) => command_worker_hwdetect(gsettings, opts),
SubCommand::Worker(WorkerOpts {
subcmd: WorkerCommand::Address(opts),
}) => command_worker_address(gsettings, opts).await,
Expand Down
29 changes: 29 additions & 0 deletions crates/hyperqueue/src/worker/hwdetect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ pub fn detect_cpus() -> anyhow::Result<CpusDescriptor> {
})
}

pub fn detect_cpus_no_ht() -> anyhow::Result<CpusDescriptor> {
let descriptor = detect_cpus()?;
let mut new_desc = Vec::new();
for socket in descriptor {
let mut new_socket = Vec::new();
for cpu_id in socket {
if read_linux_thread_siblings(cpu_id)?
.iter()
.min()
.ok_or_else(|| anyhow::anyhow!("Thread siblings are empty"))
.map(|v| *v == cpu_id)?
{
new_socket.push(cpu_id);
}
}
new_desc.push(new_socket);
}
Ok(new_desc)
}

pub fn detect_generic_resource() -> anyhow::Result<Vec<GenericResourceDescriptor>> {
let mut generic = Vec::new();
if let Ok(count) = read_linux_gpu_count() {
Expand Down Expand Up @@ -64,6 +84,15 @@ fn read_linux_numa() -> anyhow::Result<Vec<Vec<CpuId>>> {
Ok(numa_nodes)
}

fn read_linux_thread_siblings(cpu_id: CpuId) -> anyhow::Result<Vec<CpuId>> {
let filename = format!(
"/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list",
cpu_id
);
log::debug!("Reading {}", filename);
parse_range(&std::fs::read_to_string(filename)?)
}

fn p_cpu_range(input: &str) -> NomResult<Vec<CpuId>> {
map_res(
tuple((
Expand Down
27 changes: 22 additions & 5 deletions crates/hyperqueue/src/worker/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,23 @@ fn p_cpu_definition(input: &str) -> NomResult<CpusDescriptor> {
)(input)
}

fn parse_cpu_definition(input: &str) -> anyhow::Result<CpusDescriptor> {
consume_all(p_cpu_definition, input)
#[derive(Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum CpuDefinition {
Detect,
DetectNoHyperThreading,
Custom(CpusDescriptor),
}

arg_wrapper!(ArgCpuDef, CpusDescriptor, parse_cpu_definition);
fn parse_cpu_definition(input: &str) -> anyhow::Result<CpuDefinition> {
match input {
"auto" => Ok(CpuDefinition::Detect),
"no-ht" => Ok(CpuDefinition::DetectNoHyperThreading),
other => consume_all(p_cpu_definition, other).map(CpuDefinition::Custom),
}
}

arg_wrapper!(ArgCpuDefinition, CpuDefinition, parse_cpu_definition);
arg_wrapper!(
ArgGenericResourceDef,
GenericResourceDescriptor,
Expand Down Expand Up @@ -95,12 +107,17 @@ mod test {
fn test_parse_cpu_def() {
assert_eq!(
parse_cpu_definition("4").unwrap(),
vec![vec![0, 1, 2, 3].to_ids()]
CpuDefinition::Custom(vec![vec![0, 1, 2, 3].to_ids()]),
);
assert_eq!(
parse_cpu_definition("2x3").unwrap(),
vec![vec![0, 1, 2].to_ids(), vec![3, 4, 5].to_ids()]
CpuDefinition::Custom(vec![vec![0, 1, 2].to_ids(), vec![3, 4, 5].to_ids()]),
);
assert_eq!(
parse_cpu_definition("no-ht").unwrap(),
CpuDefinition::DetectNoHyperThreading,
);
assert_eq!(parse_cpu_definition("auto").unwrap(), CpuDefinition::Detect,);
}

#[test]
Expand Down
19 changes: 10 additions & 9 deletions crates/hyperqueue/src/worker/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::common::serverdir::ServerDir;
use crate::common::timeutils::ArgDuration;
use crate::transfer::messages::TaskBody;
use crate::transfer::stream::ChannelId;
use crate::worker::hwdetect::{detect_cpus, detect_generic_resource};
use crate::worker::parser::{ArgCpuDef, ArgGenericResourceDef};
use crate::worker::hwdetect::{detect_cpus, detect_cpus_no_ht, detect_generic_resource};
use crate::worker::parser::{ArgCpuDefinition, ArgGenericResourceDef, CpuDefinition};
use crate::worker::streamer::StreamSender;
use crate::worker::streamer::StreamerRef;
use crate::Map;
Expand Down Expand Up @@ -76,9 +76,9 @@ impl FromStr for ManagerOpts {

#[derive(Parser)]
pub struct WorkerStartOpts {
/// How many cores should be allocated for the worker
#[clap(long)]
cpus: Option<ArgCpuDef>,
/// How many cores should be allocated for the worker (auto, no-ht, N, MxN)
#[clap(long, default_value = "auto")]
cpus: ArgCpuDefinition,

/// Resources
#[clap(long, setting = clap::ArgSettings::MultipleOccurrences)]
Expand Down Expand Up @@ -481,10 +481,11 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
.expect("Invalid hostname")
});

let cpus = opts
.cpus
.map(|x| Ok(x.unpack()))
.unwrap_or_else(detect_cpus)?;
let cpus = match opts.cpus.unpack() {
CpuDefinition::Detect => detect_cpus()?,
CpuDefinition::DetectNoHyperThreading => detect_cpus_no_ht()?,
CpuDefinition::Custom(cpus) => cpus,
};

let mut generic = if opts.no_detect_resources {
Vec::new()
Expand Down
17 changes: 11 additions & 6 deletions docs/jobs/cresources.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,23 @@ Note: Specifying policy has effect only if you have more then one sockets(physic

* Compact (``compact``) - Tries to allocate cores on as few sockets as possible in the current worker state.

Example: ``hq --cpus="8 compact" ...``
Example: ``hq submit --cpus="8 compact" ...``

* Strict Compact (``compact!``) - Always allocate cores on as few sockets as possible for a target node. The task is not executed until the requirement could not be fully fullfiled. E.g. If your worker has 4 cores per socket and you ask for 4 cpus, it will be always executed on a single socket. If you ask for 8 cpus, it will be always executed on two sockets.

Example: ``hq --cpus="8 compact!" ...``
Example: ``hq submit --cpus="8 compact!" ...``

* Scatter (``scatter``) - Allocate cores across as many sockets possible in the current worker state. If your worker has 4 sockets with 8 cores per socket and you ask for 8 cpus than if possible in the current situation, HQ tries to run process with 2 cpus on each socket.

Example: ``hq --cpus="8 scatter" ...``
Example: ``hq submit --cpus="8 scatter" ...``


The default policy is the compact policy, i.e. ``--cpus=XX`` is equivalent to ``--cpus="XX compact"``


## CPU requests and job arrays

Resource requests are applied to each task of job. For example, if you submit the following: ``hq --cpus=2 --array=1-10`` it will create 10 tasks where each task needs two CPUs.
Resource requests are applied to each task of job. For example, if you submit the following: ``hq submit --cpus=2 --array=1-10`` it will create 10 tasks where each task needs two CPUs.


## CPUs configuration
Expand All @@ -107,13 +107,18 @@ Worker automatically detect number of CPUs and on Linux system it also detects p
it should work without need of any touch. If you want to see how is your seen by a worker without actually starting it,
you can start ``$ hq worker hwdetect`` that only prints CPUs layout.


### Manual specification of CPU configration

If automatic detection fails, or you want to manually configure set CPU configuration, you can use
If automatic detection fails, or you want to manually configure set a CPU configuration, you can use
``--cpus`` parameter; for example as follows:

- 8 CPUs for worker
``$ hq worker start --cpus=8``

- 2 sockets with 12 cores per socket
``$ hq worker --cpus=2x12``
``$ hq worker start --cpus=2x12``

- Automatic detection of CPUs but ignores HyperThreading
(it will detect only the first virtual core of each physical core)
``$ hq worker start --cpus="no-ht"``
5 changes: 5 additions & 0 deletions tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ def test_print_hw(hq_env: HqEnv):
output = hq_json_wrapper(hq_env, ["--output-type=json", "worker", "hwdetect"])
assert isinstance(output, dict)
assert "cpus" in output.keys()
output = hq_json_wrapper(
hq_env, ["--output-type=json", "worker", "hwdetect", "--no-hyperthreading"]
)
assert isinstance(output, dict)
assert "cpus" in output.keys()


"""
Expand Down