diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index e65ae7ef..5b843aaa 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -29,7 +29,7 @@ containerd-shim = { path = "../shim", version = "0.6.0", features = ["async"] } crossbeam = "0.8.1" libc.workspace = true log.workspace = true -nix = { workspace = true, features = ["socket", "uio", "term"] } +nix = { workspace = true, features = ["socket", "uio", "term", "event"] } oci-spec.workspace = true runc = { path = "../runc", version = "0.2.0", features = ["async"] } serde.workspace = true @@ -37,6 +37,9 @@ serde_json.workspace = true time.workspace = true uuid.workspace = true +[target.'cfg(target_os = "linux")'.dependencies] +cgroups-rs = "0.3.3" + # Async dependencies async-trait.workspace = true futures.workspace = true diff --git a/crates/runc-shim/src/cgroup_memory.rs b/crates/runc-shim/src/cgroup_memory.rs new file mode 100644 index 00000000..635ab247 --- /dev/null +++ b/crates/runc-shim/src/cgroup_memory.rs @@ -0,0 +1,184 @@ +use containerd_shim::{ + error::{Error, Result}, + io_error, other_error, +}; +use std::path::Path; +use tokio::{ + fs::{self, read_to_string, File}, + io::AsyncReadExt, + sync::mpsc::{self, Receiver}, +}; + +use nix::sys::eventfd; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +pub async fn get_path_from_cgorup(pid: u32) -> Result { + let proc_path = format!("/proc/{}/cgroup", pid); + let path_string = read_to_string(proc_path.clone()).await.map_err(io_error!( + e, + "open {}.", + proc_path.clone() + ))?; + + let (_, path) = path_string + .lines() + .find(|line| line.contains("memory")) + .ok_or(Error::Other("Memory line not found".into()))? + .split_once(":memory:") + .ok_or(Error::Other("Failed to parse memory line".into()))?; + + Ok(path.to_string()) +} + +pub async fn get_existing_cgroup_mem_path(pid_path: String) -> Result<(String, String)> { + let (mut mount_root, mount_point) = get_path_from_mountinfo().await?; + if mount_root == "/" { + mount_root = String::from(""); + } + let mount_root = pid_path.trim_start_matches(&mount_root).to_string(); + Ok((mount_root, mount_point)) +} + +async fn get_path_from_mountinfo() -> Result<(String, String)> { + let mountinfo_path = "/proc/self/mountinfo".to_string(); + let mountinfo_string = read_to_string(mountinfo_path.clone()) + .await + .map_err(io_error!(e, "open {}.", mountinfo_path.clone()))?; + + let line = mountinfo_string + .lines() + .find(|line| line.contains("cgroup") && line.contains("memory")) + .ok_or(Error::Other( + "Lines containers cgroup and memory not found in mountinfo".into(), + ))?; + + parse_memory_mountroot(line) +} + +fn parse_memory_mountroot(line: &str) -> Result<(String, String)> { + let mut columns = line.split_whitespace(); + columns.nth(2); + let mount_root = columns.next().ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + let mount_point = columns.next().ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + Ok((mount_root.to_string(), mount_point.to_string())) +} + +pub async fn register_memory_event( + key: &str, + cg_dir: &Path, + event_name: &str, +) -> Result> { + let path = cg_dir.join(event_name); + let event_file = fs::File::open(path.clone()) + .await + .map_err(other_error!(e, "Error get path:"))?; + + let eventfd = eventfd::eventfd(0, eventfd::EfdFlags::EFD_CLOEXEC)?; + + let event_control_path = cg_dir.join("cgroup.event_control"); + let data = format!("{} {}", eventfd.as_raw_fd(), event_file.as_raw_fd()); + fs::write(&event_control_path, data.clone()) + .await + .map_err(other_error!(e, "Error write eventfd:"))?; + + let mut buf = [0u8; 8]; + + let (sender, receiver) = mpsc::channel(128); + let key = key.to_string(); + + tokio::spawn(async move { + let mut eventfd_file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + loop { + match eventfd_file.read(&mut buf).await { + Ok(bytes_read) if bytes_read == 0 => return, + Err(_) => return, + _ => (), + } + if !Path::new(&event_control_path).exists() { + return; + } + sender.send(key.clone()).await.unwrap(); + } + }); + + Ok(receiver) +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use tokio::{fs::remove_file, io::AsyncWriteExt, process::Command}; + + use crate::cgroup_memory; + use cgroups_rs::{ + hierarchies::{self, is_cgroup2_unified_mode}, + memory::MemController, + Cgroup, CgroupPid, + }; + + #[tokio::test] + async fn test_cgroupv1_oom_monitor() { + if !is_cgroup2_unified_mode() { + // Create a memory cgroup with limits on both memory and swap. + let path = "cgroupv1_oom_monitor"; + let cg = Cgroup::new(hierarchies::auto(), path).unwrap(); + + let mem_controller: &MemController = cg.controller_of().unwrap(); + mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M + mem_controller.set_swappiness(0).unwrap(); + + // Create a sh sub process, and let it wait for the stdinput. + let mut child_process = Command::new("sh") + .stdin(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let pid = child_process.id().unwrap(); + + // Add the sh subprocess to the cgroup. + cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap(); + + // Set oom monitor + let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap(); + let (mount_root, mount_point) = + cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup) + .await + .unwrap(); + + let mem_cgroup_path = mount_point + &mount_root; + let mut rx = cgroup_memory::register_memory_event( + &pid.to_string().as_str(), + Path::new(&mem_cgroup_path), + "memory.oom_control", + ) + .await + .unwrap(); + + // Exec the sh subprocess to a dd command that consumes more than 10M of memory. + if let Some(mut stdin) = child_process.stdin.take() { + stdin + .write_all( + b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n", + ) + .await + .unwrap(); + stdin.flush().await.unwrap(); + } + + // Wait for the oom message. + if let Some(item) = rx.recv().await { + assert_eq!(pid.to_string(), item, "Receive error oom message"); + } + + // Clean. + child_process.wait().await.unwrap(); + cg.delete().unwrap(); + remove_file("/tmp/test_oom_monitor_file").await.unwrap(); + } + } +} diff --git a/crates/runc-shim/src/main.rs b/crates/runc-shim/src/main.rs index e5b180f4..887c4e35 100644 --- a/crates/runc-shim/src/main.rs +++ b/crates/runc-shim/src/main.rs @@ -18,6 +18,7 @@ use std::env; use containerd_shim::{asynchronous::run, parse}; +mod cgroup_memory; mod common; mod console; mod container; diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index 92bb0f23..6e54fd3d 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -740,8 +740,8 @@ mod tests { write_str_to_file(Path::new(test_dir).join(LOG_JSON_FILE).as_path(), log_json) .await .expect("write log json should not be error"); - let expectd_msg = "panic"; + let actual_err = runtime_error(test_dir, empty_err, "").await; remove_dir_all(test_dir) .await diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index 9a861f8b..b99903c5 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -25,26 +25,37 @@ use containerd_shim::{ }, asynchronous::ExitSignal, event::Event, + other_error, protos::{ api::{ CloseIORequest, ConnectRequest, ConnectResponse, DeleteResponse, PidsRequest, PidsResponse, StatsRequest, StatsResponse, UpdateTaskRequest, }, - events::task::{TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart}, + events::task::{ + TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskOOM, TaskStart, + }, protobuf::MessageDyn, shim_async::Task, ttrpc, ttrpc::r#async::TtrpcContext, }, util::{convert_to_any, convert_to_timestamp, AsOption}, - TtrpcResult, + Error, TtrpcResult, }; use log::{debug, info, warn}; use oci_spec::runtime::LinuxResources; -use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard}; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + MappedMutexGuard, Mutex, MutexGuard, +}; +use tokio::task::spawn; use super::container::{Container, ContainerFactory}; +use crate::cgroup_memory; +use cgroups_rs::hierarchies::is_cgroup2_unified_mode; + +use std::path::Path; type EventSender = Sender<(String, Box)>; /// TaskService is a Task template struct, it is considered a helper struct, @@ -95,6 +106,22 @@ impl TaskService { } } +fn run_oom_monitor(mut rx: Receiver, id: String, tx: EventSender) { + let oom_event = TaskOOM { + container_id: id, + ..Default::default() + }; + let topic = oom_event.topic(); + let oom_box = Box::new(oom_event); + spawn(async move { + while let Some(_item) = rx.recv().await { + tx.send((topic.to_string(), oom_box.clone())) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + } + }); +} + #[async_trait] impl Task for TaskService where @@ -164,6 +191,23 @@ where ..Default::default() }) .await; + std::thread::sleep(std::time::Duration::from_secs(30)); + if !is_cgroup2_unified_mode() { + let path_from_cgorup = cgroup_memory::get_path_from_cgorup(resp.pid).await?; + let (mount_root, mount_point) = + cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?; + + let mem_cgroup_path = mount_point + &mount_root; + let rx = cgroup_memory::register_memory_event( + &req.id, + Path::new(&mem_cgroup_path), + "memory.oom_control", + ) + .await + .map_err(other_error!(e, "register_memory_event failed:"))?; + + run_oom_monitor(rx, req.id.to_string(), self.tx.clone()); + } } else { self.send_event(TaskExecStarted { container_id: req.id.to_string(),