Skip to content

Commit

Permalink
Async runtimes support
Browse files Browse the repository at this point in the history
  • Loading branch information
svartalf committed Feb 16, 2020
1 parent d0f339d commit efca13c
Show file tree
Hide file tree
Showing 48 changed files with 688 additions and 278 deletions.
4 changes: 4 additions & 0 deletions heim-cpu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ mach = "0.3.2"
heim-derive = { version = "0.0.10", path = "../heim-derive" }
futures-executor = "^0.3"
version-sync = "0.8"
pin-utils = "0.1.0-alpha.4"

[features]
runtime-polyfill = ["heim-runtime/runtime-polyfill"]
runtime-tokio = ["heim-runtime/runtime-tokio"]
runtime-async-std = ["heim-runtime/runtime-async-std"]
runtime-gio = ["heim-runtime/runtime-gio"]
3 changes: 2 additions & 1 deletion heim-cpu/examples/cpu_frequencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use heim_cpu as cpu;

#[cfg(target_os = "linux")]
async fn linux_frequencies() -> Result<()> {
let mut frequencies = cpu::os::linux::frequencies();
let frequencies = cpu::os::linux::frequencies();
pin_utils::pin_mut!(frequencies);
while let Some(freq) = frequencies.next().await {
dbg!(freq?);
}
Expand Down
3 changes: 2 additions & 1 deletion heim-cpu/examples/cpu_times.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use heim_cpu as cpu;
async fn main() -> Result<()> {
dbg!(cpu::time().await?);

let mut times = cpu::times();
let times = cpu::times();
pin_utils::pin_mut!(times);
while let Some(time) = times.next().await {
dbg!(time?);
}
Expand Down
2 changes: 1 addition & 1 deletion heim-cpu/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! CPU information.
//!
//! This module is enabled with the `cpu` feature flag (enabled by default).
//! This module is enabled with the `cpu` feature flag (disabled by default).

#![doc(html_root_url = "https://docs.rs/heim-cpu/0.0.10")]
#![deny(
Expand Down
56 changes: 37 additions & 19 deletions heim-cpu/src/sys/linux/count/logical.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
use heim_common::prelude::*;
use heim_runtime::fs;
use heim_runtime::prelude::*;

fn sysconf() -> impl Future<Output = Result<u64>> {
fn sysconf() -> Result<u64> {
let result = unsafe { libc::sysconf(libc::_SC_NPROCESSORS_ONLN) };

if result < 0 {
future::err(Error::last_os_error())
Err(Error::last_os_error())
} else {
future::ok(result as u64)
Ok(result as u64)
}
}

fn cpuinfo() -> impl Future<Output = Result<u64>> {
fs::read_lines("/proc/cpuinfo")
.try_filter(|line| future::ready(line.starts_with("processor")))
.map_err(Error::from)
.try_fold(0, |acc, _| future::ok(acc + 1))
async fn cpuinfo() -> Result<u64> {
let mut lines = rt::fs::read_lines("/proc/cpuinfo").await?;
let mut count = 0;
while let Some(line) = lines.next().await {
let line = line?;
if line.starts_with("processor") {
count += 1;
}
}

Ok(count)
}

fn stat() -> impl Future<Output = Result<u64>> {
fs::read_lines("/proc/stat")
.try_filter(|line| future::ready(line.starts_with("cpu")))
.map_err(Error::from)
// the first "cpu" line aggregates the numbers in all
// of the other "cpuN" lines, hence skip the first item
.skip(1)
.try_fold(0, |acc, _| future::ok(acc + 1))
async fn stat() -> Result<u64> {
// the first "cpu" line aggregates the numbers in all
// of the other "cpuN" lines, hence skip the first item
let mut lines = rt::fs::read_lines("/proc/stat").await?.skip(1);

let mut count = 0;
while let Some(line) = lines.next().await {
let line = line?;
if line.starts_with("cpu") {
count += 1;
}
}

Ok(count)
}

pub fn logical_count() -> impl Future<Output = Result<u64>> {
sysconf().or_else(|_| cpuinfo()).or_else(|_| stat())
pub async fn logical_count() -> Result<u64> {
match sysconf() {
Ok(value) => Ok(value),
Err(..) => match cpuinfo().await {
Ok(value) => Ok(value),
Err(..) => stat().await,
},
}
}
152 changes: 70 additions & 82 deletions heim-cpu/src/sys/linux/count/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,45 @@ use std::os::unix::ffi::OsStrExt;
use std::str;

use heim_common::prelude::*;
use heim_runtime::fs;
use heim_runtime::prelude::*;

fn topology() -> impl Future<Output = Result<u64>> {
let acc = HashSet::<u64>::new();
fs::read_dir("/sys/devices/system/cpu/")
.try_filter_map(|entry| {
let matched = match entry.path().file_name() {
Some(name) if name.as_bytes().starts_with(b"cpu") => {
// Safety: since it will be used with Linux only,
// it is okay to assume that /sys files will has the UTF-8 names
let core_id = unsafe { str::from_utf8_unchecked(&name.as_bytes()[3..]) };
async fn topology() -> Result<u64> {
let mut acc = HashSet::<u64>::new();
let mut entries = rt::fs::read_dir("/sys/devices/system/cpu/").await?;
while let Some(entry) = entries.next().await {
let entry = entry?;

match core_id.parse::<u64>() {
Ok(..) => Some(entry),
_ => None,
}
// TODO: Whole block looks ugly, rewrite it.
// What it does: checks if entry name conforms to `cpu\d+` pattern.
match entry.path().file_name() {
Some(name) if name.as_bytes().starts_with(b"cpu") => {
// Safety: since it will be used with Linux only,
// for it is okay to assume that /sys files will has the UTF-8 names.
// TODO: Make it safe instead.
let core_id = unsafe { str::from_utf8_unchecked(&name.as_bytes()[3..]) };

match core_id.parse::<u64>() {
Ok(..) => {}
_ => continue,
}
_ => None,
};
}
_ => continue,
};

future::ok(matched)
})
.and_then(|entry| {
let path = entry.path().join("topology/core_id");
let path = entry.path().join("topology/core_id");
let contents = rt::fs::read_to_string(path).await?;
let cpu_id = contents.trim().parse()?;

fs::read_to_string(path)
})
.map_err(Error::from)
.and_then(|contents| future::ready(contents.trim().parse::<u64>().map_err(Error::from)))
.try_fold(acc, |mut acc, cpu_id| {
let _ = acc.insert(cpu_id);
let _ = acc.insert(cpu_id);
}

future::ok(acc)
})
.and_then(|acc| {
if !acc.is_empty() {
future::ok(acc.len() as u64)
} else {
// This error will not be propagated to caller,
// since `physical_count` will call `or_else()` on it
future::err(Error::incompatible("Unable to fetch CPU topology"))
}
})
if !acc.is_empty() {
Ok(acc.len() as u64)
} else {
// This error will not be propagated to caller,
// since `physical_count` will call `or_else()` on it
Err(Error::incompatible("Unable to fetch CPU topology"))
}
}

#[derive(Default)]
Expand All @@ -62,53 +58,45 @@ fn parse_line(line: &str) -> Result<u64> {
.and_then(|value| value.parse::<u64>().map_err(Error::from))
}

fn cpu_info() -> impl Future<Output = Result<Option<u64>>> {
let acc = Collector::default();

fs::read_lines("/proc/cpuinfo")
.map_err(Error::from)
.try_fold(acc, |mut acc, line| {
let result = match &line {
l if l.starts_with("physical id") => match parse_line(l.as_str()) {
Ok(physical_id) if acc.physical_id.is_none() => {
acc.physical_id = Some(physical_id);

Ok(acc)
}
Ok(..) => {
panic!("Missed the core id value in the /proc/cpuinfo!");
}
Err(e) => Err(e),
},
l if l.starts_with("core id") => match parse_line(l.as_str()) {
Ok(core_id) if acc.physical_id.is_some() => {
let physical_id = acc
.physical_id
.take()
.expect("Will not happen, match guard covers that");
let _ = acc.group.insert((physical_id, core_id));
async fn cpu_info() -> Result<Option<u64>> {
let mut acc = Collector::default();

Ok(acc)
}
Ok(..) => {
panic!("Missed the physical id value in the /proc/cpuinfo!");
}
Err(e) => Err(e),
},
_ => Ok(acc),
};

future::ready(result)
})
.map_ok(|acc| {
if !acc.group.is_empty() {
Some(acc.group.len() as u64)
} else {
None
let mut lines = rt::fs::read_lines("/proc/cpuinfo").await?;
while let Some(line) = lines.next().await {
match &line? {
l if l.starts_with("physical id") => {
let core_id = parse_line(l.as_str())?;
if acc.physical_id.is_none() {
acc.physical_id = Some(core_id)
} else {
// TODO: In general it seems better to return an error
panic!("Missed the core id value in the /proc/cpuinfo, implementation bug");
}
}
})
l if l.starts_with("core id") => {
let core_id = parse_line(l.as_str())?;
if acc.physical_id.is_some() {
let physical_id = acc.physical_id.take().expect("Not expected to be happen");
let _ = acc.group.insert((physical_id, core_id));
} else {
// TODO: In general it seems better to return an error
panic!("Missed the physical id value in the /proc/cpuinfo!");
}
}
_ => continue,
}
}

if !acc.group.is_empty() {
Ok(Some(acc.group.len() as u64))
} else {
Ok(None)
}
}

pub fn physical_count() -> impl Future<Output = Result<Option<u64>>> {
topology().map_ok(Some).or_else(|_| cpu_info())
pub async fn physical_count() -> Result<Option<u64>> {
match topology().await {
Ok(count) => Ok(Some(count)),
Err(..) => cpu_info().await,
}
}
44 changes: 24 additions & 20 deletions heim-cpu/src/sys/linux/freq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};

use heim_common::prelude::*;
use heim_common::units::{frequency, Frequency};
use heim_runtime::fs;
use heim_runtime::prelude::*;

#[derive(Debug, Default)]
pub struct CpuFrequency {
Expand Down Expand Up @@ -87,7 +87,8 @@ pub fn frequencies() -> impl Stream<Item = Result<CpuFrequency>> {

// TODO: https://github.com/giampaolo/psutil/issues/1269

fs::read_dir("/sys/devices/system/cpu/")
rt::fs::read_dir("/sys/devices/system/cpu/")
.try_flatten_stream()
.map_err(Error::from)
.try_filter(|entry| {
let name = entry.file_name();
Expand All @@ -102,31 +103,32 @@ pub fn frequencies() -> impl Stream<Item = Result<CpuFrequency>> {
.map_ok(|entry| entry.path().join("cpufreq"))
.try_filter(|path| {
// TODO: Get rid of the `.clone()`
fs::path_exists(path.clone())
rt::fs::path_exists(path.clone())
})
.and_then(|path| {
.and_then(|path| async move {
let current = current_freq(&path);
let max = max_freq(&path);
let min = min_freq(&path);

future::try_join3(current, max, min)
// TODO: Use join provided by `heim_runtime`
future::try_join3(current, max, min).await
})
.and_then(|(current, max, min)| future::ok(CpuFrequency { current, max, min }))
}

#[allow(clippy::redundant_closure)]
fn read_freq(path: PathBuf) -> impl Future<Output = Result<Frequency>> {
fs::read_to_string(path)
.map_err(Error::from)
.and_then(|value| future::ready(value.trim_end().parse::<u64>().map_err(Error::from)))
.map_ok(Frequency::new::<frequency::kilohertz>)
async fn read_freq(path: PathBuf) -> Result<Frequency> {
let contents = rt::fs::read_to_string(path).await?;
let value = contents.trim_end().parse::<u64>()?;

Ok(Frequency::new::<frequency::kilohertz>(value))
}

fn current_freq(path: &Path) -> impl Future<Output = Result<Frequency>> {
async fn current_freq(path: &Path) -> Result<Frequency> {
// TODO: Wait for Future' `try_select_all` and uncomment the block below
// Ref: https://github.com/rust-lang-nursery/futures-rs/pull/1557

read_freq(path.join("scaling_cur_freq"))
read_freq(path.join("scaling_cur_freq")).await

// let one = read_freq(path.join("scaling_cur_freq"))
// .into_future().fuse();
Expand All @@ -141,14 +143,16 @@ fn current_freq(path: &Path) -> impl Future<Output = Result<Frequency>> {
// future::ready(result)
}

fn max_freq(path: &Path) -> impl Future<Output = Result<Option<Frequency>>> {
read_freq(path.join("scaling_max_freq"))
.into_future()
.map(|value| Ok(value.ok()))
async fn max_freq(path: &Path) -> Result<Option<Frequency>> {
let value = read_freq(path.join("scaling_max_freq")).await;

// Don't care about errors propagation at this point
Ok(value.ok())
}

fn min_freq(path: &Path) -> impl Future<Output = Result<Option<Frequency>>> {
read_freq(path.join("scaling_min_freq"))
.into_future()
.map(|value| Ok(value.ok()))
async fn min_freq(path: &Path) -> Result<Option<Frequency>> {
let value = read_freq(path.join("scaling_min_freq")).await;

// Don't care about errors propagation at this point
Ok(value.ok())
}
4 changes: 2 additions & 2 deletions heim-cpu/src/sys/linux/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use heim_common::prelude::*;
use heim_runtime::fs;
use heim_runtime::prelude::*;

#[derive(Debug, Default)]
pub struct CpuStats {
Expand Down Expand Up @@ -57,5 +57,5 @@ impl FromStr for CpuStats {
}

pub fn stats() -> impl Future<Output = Result<CpuStats>> {
fs::read_into("/proc/stat")
rt::fs::read_into("/proc/stat")
}

0 comments on commit efca13c

Please sign in to comment.