Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add oom monitor for cgroupv2. #257

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ tokio = { workspace = true, features = ["full"] }

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"
nix = { workspace = true, features = ["event"] }
nix = { workspace = true, features = ["event", "inotify"] }
207 changes: 166 additions & 41 deletions crates/runc-shim/src/cgroup_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@
#![cfg(target_os = "linux")]

use std::{
os::unix::io::{AsRawFd, FromRawFd},
collections::HashMap,
mem::size_of,
os::{
fd::AsFd,
unix::io::{AsRawFd, FromRawFd},
},
path::Path,
sync::Arc,
};

use containerd_shim::{
error::{Error, Result},
io_error, other_error,
};
use nix::sys::eventfd::{EfdFlags, EventFd};
use nix::sys::{
eventfd::{EfdFlags, EventFd},
inotify,
};
use tokio::{
fs::{self, read_to_string, File},
io::AsyncReadExt,
sync::mpsc::{self, Receiver},
};

pub const DEFAULT_CGROUPV2_PATH: &str = "/sys/fs/cgroup";

pub async fn get_path_from_cgorup(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
Expand Down Expand Up @@ -126,6 +137,111 @@ pub async fn register_memory_event(
Ok(receiver)
}

fn memory_event_fd(path: &Path) -> Result<inotify::Inotify> {
let instance = inotify::Inotify::init(inotify::InitFlags::empty())?;

let fpath = path.join("memory.events");
instance.add_watch(&fpath, inotify::AddWatchFlags::IN_MODIFY)?;

let evpath = path.join("cgroup.events");
instance.add_watch(&evpath, inotify::AddWatchFlags::IN_MODIFY)?;

Ok(instance)
}

async fn parse_kv_file(cg_dir: &Path, file: &str) -> Result<HashMap<String, u32>> {
let path = cg_dir.join(file);
let mut map: HashMap<String, u32> = HashMap::new();

let file_string = read_to_string(path.clone()).await.map_err(io_error!(
e,
"open {}.",
path.to_string_lossy()
))?;

for line in file_string.lines() {
if let Some((key, val)) = line.split_once(" ") {
let val = val.parse::<u32>()?;
map.insert(key.to_string(), val);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More rusty variation of this:

    let map = file_string
        .lines()
        .filter(|line| line.split_once(" "))
        .map(|(k, v) {
            let val = val.parse::<u32>()?;
            Ok((k.to_string(), val))
        })
        .collect::<Result<HashMap<_, _>>>()?;


Ok(map)
}

pub async fn register_memory_event_v2(key: &str, cg_dir: &Path) -> Result<Receiver<String>> {
let (sender, receiver) = mpsc::channel(128);
let cg_dir = Arc::new(Box::from(cg_dir));
let key = key.to_string();

tokio::spawn(async move {
let inotify = memory_event_fd(&cg_dir).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this before spawn to handle error properly.
And then move to thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also possibly you could extend the function to return a stream of inotify_event events to simplify the logic of this loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, sorry, I don't get you. Are you suggesting that I should move certain parts of this loop into a separate function in order to simplify it? If that's the case, how to make the function returns a stream of inotify_event? Using a channel for that purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxpv or to modify here as:

fn new_read_inotify_file_function(inotify_file) {
         nread = read(inotify_file)
        // verify events here
        if nread >= size_of::<libc::inotify_event>()
        ....
}
pub async fn register_memory_event_v2() {
    // create inotify file
    inotify_file = ...
    tokio::spawn() {
        loop {
            // mv all verifications of events to a new function
            if new_read_inotify_file_function(inotify_file) {
                            parse_kv_file()
                            .......
            } 
        }
    }
}

let mut eventfd_file = unsafe { File::from_raw_fd(inotify.as_fd().as_raw_fd()) };
let mut buffer: [u8; 4096] = [0u8; 4096];

let mut lastoom_map: HashMap<String, u32> = HashMap::new();
loop {
let nread = match eventfd_file.read(&mut buffer).await {
Ok(nread) => nread,
Err(_) => return,
};
if nread >= size_of::<libc::inotify_event>() {
match parse_kv_file(&cg_dir, "memory.events").await {
Ok(mem_map) => {
let last_oom_kill = match lastoom_map.get(&key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: try lastoom_map.get(&key).copied().unwrap_or(0)

Some(v) => v,
None => &0,
};

let oom_kill = match mem_map.get("oom_kill") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

let Some(oom_kill) = mem_map.get("oom_kill") else {
    return;
}

same for below.

Some(v) => v,
None => return,
};

if *oom_kill > *last_oom_kill {
sender.send(key.clone()).await.unwrap();
}
if *oom_kill > 0 {
lastoom_map.insert(key.to_string(), *oom_kill);
}
}
Err(_) => return,
};

let cg_map = match parse_kv_file(&cg_dir, "cgroup.events").await {
Ok(cg_map) => cg_map,
Err(_) => return,
};
match cg_map.get("populated") {
Some(v) if *v == 0 => return,
Some(_) => (),
None => return,
};
}
}
});

Ok(receiver)
}

pub async fn get_path_from_cgorup_v2(pid: u32) -> Result<String> {
let proc_path = format!("/proc/{}/cgroup", pid);
let path_string = read_to_string(&proc_path)
.await
.map_err(io_error!(e, "open {}.", &proc_path))?;

let (_, path) = path_string
.lines()
.nth(0)
.ok_or(Error::Other(
"Error happened while geting the path from cgroup of container process pid.".into(),
))?
.split_once("::")
.ok_or(Error::Other("Failed to parse memory line".into()))?;

Ok(path.to_string())
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand All @@ -140,63 +256,72 @@ mod tests {
use crate::cgroup_memory;

#[tokio::test]
async fn test_cgroupv1_oom_monitor() {
if !is_cgroup2_unified_mode() {
// Create a memory cgroup with limits on both memory and swap.
let path = "cgroupv1_oom_monitor";
let cg = Cgroup::new(hierarchies::auto(), path).unwrap();

let mem_controller: &MemController = cg.controller_of().unwrap();
mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M
mem_controller.set_swappiness(0).unwrap();

// Create a sh sub process, and let it wait for the stdinput.
let mut child_process = Command::new("sh")
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();
async fn test_cgroup_oom_monitor() {
// Create a memory cgroup with limits on both memory and swap.
let path = "cgroupv1_oom_monitor";
let cg = Cgroup::new(hierarchies::auto(), path).unwrap();

let mem_controller: &MemController = cg.controller_of().unwrap();
mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M
mem_controller.set_swappiness(0).unwrap();

// Create a sh sub process, and let it wait for the stdinput.
let mut child_process = Command::new("sh")
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();

let pid = child_process.id().unwrap();
let pid = child_process.id().unwrap();

// Add the sh subprocess to the cgroup.
cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap();
// Add the sh subprocess to the cgroup.
cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap();
let mut rx: tokio::sync::mpsc::Receiver<String>;

// Set oom monitor
if !is_cgroup2_unified_mode() {
// Set cgroupv1 oom monitor
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap();
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup)
.await
.unwrap();

let mem_cgroup_path = mount_point + &mount_root;
let mut rx = cgroup_memory::register_memory_event(
rx = cgroup_memory::register_memory_event(
pid.to_string().as_str(),
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.unwrap();
} else {
// Set cgroupv2 oom monitor
let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await.unwrap();
let mem_cgroup_path =
cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup;

// Exec the sh subprocess to a dd command that consumes more than 10M of memory.
if let Some(mut stdin) = child_process.stdin.take() {
stdin
.write_all(
b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n",
)
.await
.unwrap();
stdin.flush().await.unwrap();
}

// Wait for the oom message.
if let Some(item) = rx.recv().await {
assert_eq!(pid.to_string(), item, "Receive error oom message");
}
rx = cgroup_memory::register_memory_event_v2(
pid.to_string().as_str(),
Path::new(&mem_cgroup_path),
)
.await
.unwrap();
}
// Exec the sh subprocess to a dd command that consumes more than 10M of memory.
if let Some(mut stdin) = child_process.stdin.take() {
stdin
.write_all(b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n")
.await
.unwrap();
stdin.flush().await.unwrap();
}

// Clean.
child_process.wait().await.unwrap();
cg.delete().unwrap();
remove_file("/tmp/test_oom_monitor_file").await.unwrap();
// Wait for the oom message.
if let Some(item) = rx.recv().await {
assert_eq!(pid.to_string(), item, "Receive error oom message");
}
// Clean.
child_process.wait().await.unwrap();
cg.delete().unwrap();
remove_file("/tmp/test_oom_monitor_file").await.unwrap();
}
}
13 changes: 11 additions & 2 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,31 @@ fn run_oom_monitor(mut rx: Receiver<String>, id: String, tx: EventSender) {

#[cfg(target_os = "linux")]
async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> {
// std::thread::sleep(std::time::Duration::from_secs(20));
let rx: Receiver<String>;
if !is_cgroup2_unified_mode() {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await?;
let (mount_root, mount_point) =
cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?;

let mem_cgroup_path = mount_point + &mount_root;
let rx = cgroup_memory::register_memory_event(
rx = cgroup_memory::register_memory_event(
id,
Path::new(&mem_cgroup_path),
"memory.oom_control",
)
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
} else {
let path_from_cgorup = cgroup_memory::get_path_from_cgorup_v2(pid).await?;
let mem_cgroup_path = cgroup_memory::DEFAULT_CGROUPV2_PATH.to_owned() + &path_from_cgorup;

run_oom_monitor(rx, id.to_string(), tx);
rx = cgroup_memory::register_memory_event_v2(id, Path::new(&mem_cgroup_path))
.await
.map_err(other_error!(e, "register_memory_event failed:"))?;
}

run_oom_monitor(rx, id.to_string(), tx);
Ok(())
}

Expand Down
Loading