Skip to content

Commit

Permalink
Feature: Add oom monitor for cgroupv2.
Browse files Browse the repository at this point in the history
  • Loading branch information
aa624545345 committed Apr 3, 2024
1 parent c696ce4 commit 7300ca1
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 5 deletions.
2 changes: 1 addition & 1 deletion crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ tokio = { workspace = true, features = ["full"] }

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"
nix = { workspace = true, features = ["event"] }
nix = { workspace = true, features = ["event", "inotify"] }
121 changes: 119 additions & 2 deletions crates/runc-shim/src/cgroup_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,34 @@

#![cfg(target_os = "linux")]

use std::sync::Arc;
use std::{
os::unix::io::{AsRawFd, FromRawFd},
mem::size_of,
os::{
fd::AsFd,
unix::io::{AsRawFd, FromRawFd},
},
path::Path,
};

use containerd_shim::{
error::{Error, Result},
io_error, other_error,
};
use nix::sys::eventfd::{EfdFlags, EventFd};
use nix::sys::{
eventfd::{EfdFlags, EventFd},
inotify,
};
use tokio::{
fs::{self, read_to_string, File},
io::AsyncReadExt,
sync::mpsc::{self, Receiver},
};

use std::collections::HashMap;

pub const DEFAULT_CGROUPV2_PATH: &str = "/sys/fs/cgroup";

pub async fn get_path_from_cgorup(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
Expand Down Expand Up @@ -126,6 +138,111 @@ pub async fn register_memory_event(
Ok(receiver)
}

fn memory_event_fd(path: &Path) -> Result<inotify::Inotify> {
let instance = inotify::Inotify::init(inotify::InitFlags::empty())?;

let fpath = path.join("memory.events");
instance.add_watch(&fpath, inotify::AddWatchFlags::IN_MODIFY)?;

let evpath = path.join("cgroup.events");
instance.add_watch(&evpath, inotify::AddWatchFlags::IN_MODIFY)?;

Ok(instance)
}

async fn parse_kv_file(cg_dir: &Path, file: &str) -> Result<HashMap<String, u32>> {
let path = cg_dir.join(file);
let mut map: HashMap<String, u32> = HashMap::new();

let file_string = read_to_string(path.clone()).await.map_err(io_error!(
e,
"open {}.",
path.to_string_lossy()
))?;

for line in file_string.lines() {
if let Some((key, val)) = line.split_once(" ") {
let val = val.parse::<u32>()?;
map.insert(key.to_string(), val);
}
}

Ok(map)
}

pub async fn register_memory_event_v2(key: &str, cg_dir: &Path) -> Result<Receiver<String>> {
let (sender, receiver) = mpsc::channel(128);
let cg_dir = Arc::new(Box::from(cg_dir));
let key = key.to_string();

tokio::spawn(async move {
let inotify = memory_event_fd(&cg_dir).unwrap();
let mut eventfd_file = unsafe { File::from_raw_fd(inotify.as_fd().as_raw_fd()) };
let mut buffer: [u8; 4096] = [0u8; 4096];

let mut lastoom_map: HashMap<String, u32> = HashMap::new();
loop {
let nread = match eventfd_file.read(&mut buffer).await {
Ok(nread) => nread,
Err(_) => return,
};
if nread >= size_of::<libc::inotify_event>() {
match parse_kv_file(&cg_dir, "memory.events").await {
Ok(mem_map) => {
let last_oom_kill = match lastoom_map.get(&key) {
Some(v) => v,
None => &0,
};

let oom_kill = match mem_map.get("oom_kill") {
Some(v) => v,
None => return,
};

if *oom_kill > *last_oom_kill {
sender.send(key.clone()).await.unwrap();
}
if *oom_kill > 0 {
lastoom_map.insert(key.to_string(), *oom_kill);
}
}
Err(_) => return,
};

let cg_map = match parse_kv_file(&cg_dir, "cgroup.events").await {
Ok(cg_map) => cg_map,
Err(_) => return,
};
match cg_map.get("populated") {
Some(v) if *v == 0 => return,
Some(_) => (),
None => return,
};
}
}
});

Ok(receiver)
}

pub async fn get_path_from_cgorup_v2(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
.await
.map_err(io_error!(e, "open {}.", &proc_path))?;

let (_, path) = path_string
.lines()
.nth(0)
.ok_or(Error::Other(
"Error happened while geting the path from cgroup of container process pid.".into(),
))?
.split_once("::")
.ok_or(Error::Other("Failed to parse memory line".into()))?;

Ok(path.to_string())
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down
13 changes: 11 additions & 2 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,31 @@ fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {

#[cfg(target_os = "linux")]
async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> {
// std::thread::sleep(std::time::Duration::from_secs(20));
let rx: Receiver<String>;
if !is_cgroup2_unified_mode() {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await?;
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let mem_cgroup_path = mount_point + &mount_root;
let rx = cgroup_memory::register_memory_event(
rx = cgroup_memory::register_memory_event(
id,
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
} else {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await?;
let mem_cgroup_path = cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup;

run_oom_monitor(rx, id.to_string(), tx);
rx = cgroup_memory::register_memory_event_v2(id, Path::new(&mem_cgroup_path))
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
}

run_oom_monitor(rx, id.to_string(), tx);
Ok(())
}

Expand Down

0 comments on commit 7300ca1

Please sign in to comment.