From c6f29ff7f75aa6e020e37e9232efb9b3ee790e0d Mon Sep 17 00:00:00 2001 From: Qiao Date: Fri, 29 Dec 2023 11:45:52 +0800 Subject: [PATCH] add oom monitor for rustshim --- crates/runc-shim/Cargo.toml | 4 + crates/runc-shim/src/cgroup_memory.rs | 202 ++++++++++++++++++++++++++ crates/runc-shim/src/main.rs | 1 + crates/runc-shim/src/task.rs | 63 +++++++- 4 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 crates/runc-shim/src/cgroup_memory.rs diff --git a/crates/runc-shim/Cargo.toml b/crates/runc-shim/Cargo.toml index e65ae7ef..8a573f5e 100644 --- a/crates/runc-shim/Cargo.toml +++ b/crates/runc-shim/Cargo.toml @@ -41,3 +41,7 @@ uuid.workspace = true async-trait.workspace = true futures.workspace = true tokio = { workspace = true, features = ["full"] } + +[target.'cfg(target_os = "linux")'.dependencies] +cgroups-rs = "0.3.3" +nix = { workspace = true, features = ["event"] } \ No newline at end of file diff --git a/crates/runc-shim/src/cgroup_memory.rs b/crates/runc-shim/src/cgroup_memory.rs new file mode 100644 index 00000000..47b88a13 --- /dev/null +++ b/crates/runc-shim/src/cgroup_memory.rs @@ -0,0 +1,202 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#![cfg(target_os = "linux")] + +use std::{ + os::unix::io::{AsRawFd, FromRawFd}, + path::Path, +}; + +use containerd_shim::{ + error::{Error, Result}, + io_error, other_error, +}; +use nix::sys::eventfd; +use tokio::{ + fs::{self, read_to_string, File}, + io::AsyncReadExt, + sync::mpsc::{self, Receiver}, +}; + +pub async fn get_path_from_cgorup(pid: u32) -> Result { + let proc_path = format!("/proc/{}/cgroup", pid); + let path_string = read_to_string(&proc_path) + .await + .map_err(io_error!(e, "open {}.", &proc_path))?; + + let (_, path) = path_string + .lines() + .find(|line| line.contains("memory")) + .ok_or(Error::Other("Memory line not found".into()))? + .split_once(":memory:") + .ok_or(Error::Other("Failed to parse memory line".into()))?; + + Ok(path.to_string()) +} + +pub async fn get_existing_cgroup_mem_path(pid_path: String) -> Result<(String, String)> { + let (mut mount_root, mount_point) = get_path_from_mountinfo().await?; + if mount_root == "/" { + mount_root = String::from(""); + } + let mount_root = pid_path.trim_start_matches(&mount_root).to_string(); + Ok((mount_root, mount_point)) +} + +async fn get_path_from_mountinfo() -> Result<(String, String)> { + let mountinfo_path = "/proc/self/mountinfo"; + let mountinfo_string = + read_to_string(mountinfo_path) + .await + .map_err(io_error!(e, "open {}.", mountinfo_path))?; + + let line = mountinfo_string + .lines() + .find(|line| line.contains("cgroup") && line.contains("memory")) + .ok_or(Error::Other( + "Lines containers cgroup and memory not found in mountinfo".into(), + ))?; + + parse_memory_mountroot(line) +} + +fn parse_memory_mountroot(line: &str) -> Result<(String, String)> { + let mut columns = line.split_whitespace(); + let mount_root = columns.nth(3).ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + let mount_point = columns.next().ok_or(Error::Other( + "Invalid input information about mountinfo".into(), + ))?; + Ok((mount_root.to_string(), mount_point.to_string())) +} + +pub async fn register_memory_event( + key: &str, + cg_dir: &Path, + event_name: &str, +) -> Result> { + let path = cg_dir.join(event_name); + let event_file = fs::File::open(path.clone()) + .await + .map_err(other_error!(e, "Error get path:"))?; + + let eventfd = eventfd::eventfd(0, eventfd::EfdFlags::EFD_CLOEXEC)?; + + let event_control_path = cg_dir.join("cgroup.event_control"); + let data = format!("{} {}", eventfd.as_raw_fd(), event_file.as_raw_fd()); + fs::write(&event_control_path, data.clone()) + .await + .map_err(other_error!(e, "Error write eventfd:"))?; + + let mut buf = [0u8; 8]; + + let (sender, receiver) = mpsc::channel(128); + let key = key.to_string(); + + tokio::spawn(async move { + let mut eventfd_file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + loop { + match eventfd_file.read(&mut buf).await { + Ok(bytes_read) if bytes_read == 0 => return, + Err(_) => return, + _ => (), + } + if !Path::new(&event_control_path).exists() { + return; + } + sender.send(key.clone()).await.unwrap(); + } + }); + + Ok(receiver) +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use cgroups_rs::{ + hierarchies::{self, is_cgroup2_unified_mode}, + memory::MemController, + Cgroup, CgroupPid, + }; + use tokio::{fs::remove_file, io::AsyncWriteExt, process::Command}; + + use crate::cgroup_memory; + + #[tokio::test] + async fn test_cgroupv1_oom_monitor() { + if !is_cgroup2_unified_mode() { + // Create a memory cgroup with limits on both memory and swap. + let path = "cgroupv1_oom_monitor"; + let cg = Cgroup::new(hierarchies::auto(), path).unwrap(); + + let mem_controller: &MemController = cg.controller_of().unwrap(); + mem_controller.set_limit(10 * 1024 * 1024).unwrap(); // 10M + mem_controller.set_swappiness(0).unwrap(); + + // Create a sh sub process, and let it wait for the stdinput. + let mut child_process = Command::new("sh") + .stdin(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let pid = child_process.id().unwrap(); + + // Add the sh subprocess to the cgroup. + cg.add_task_by_tgid(CgroupPid::from(pid as u64)).unwrap(); + + // Set oom monitor + let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await.unwrap(); + let (mount_root, mount_point) = + cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup) + .await + .unwrap(); + + let mem_cgroup_path = mount_point + &mount_root; + let mut rx = cgroup_memory::register_memory_event( + pid.to_string().as_str(), + Path::new(&mem_cgroup_path), + "memory.oom_control", + ) + .await + .unwrap(); + + // Exec the sh subprocess to a dd command that consumes more than 10M of memory. + if let Some(mut stdin) = child_process.stdin.take() { + stdin + .write_all( + b"exec dd if=/dev/zero of=/tmp/test_oom_monitor_file bs=11M count=1\n", + ) + .await + .unwrap(); + stdin.flush().await.unwrap(); + } + + // Wait for the oom message. + if let Some(item) = rx.recv().await { + assert_eq!(pid.to_string(), item, "Receive error oom message"); + } + + // Clean. + child_process.wait().await.unwrap(); + cg.delete().unwrap(); + remove_file("/tmp/test_oom_monitor_file").await.unwrap(); + } + } +} diff --git a/crates/runc-shim/src/main.rs b/crates/runc-shim/src/main.rs index e5b180f4..887c4e35 100644 --- a/crates/runc-shim/src/main.rs +++ b/crates/runc-shim/src/main.rs @@ -18,6 +18,7 @@ use std::env; use containerd_shim::{asynchronous::run, parse}; +mod cgroup_memory; mod common; mod console; mod container; diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index 9a861f8b..eec1237d 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -13,7 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. */ - use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; @@ -44,9 +43,27 @@ use oci_spec::runtime::LinuxResources; use tokio::sync::{mpsc::Sender, MappedMutexGuard, Mutex, MutexGuard}; use super::container::{Container, ContainerFactory}; - type EventSender = Sender<(String, Box)>; +#[cfg(target_os = "linux")] +use std::path::Path; + +#[cfg(target_os = "linux")] +use cgroups_rs::hierarchies::is_cgroup2_unified_mode; +#[cfg(target_os = "linux")] +use containerd_shim::{ + error::{Error, Result}, + other_error, + protos::events::task::TaskOOM, +}; +#[cfg(target_os = "linux")] +use log::error; +#[cfg(target_os = "linux")] +use tokio::{sync::mpsc::Receiver, task::spawn}; + +#[cfg(target_os = "linux")] +use crate::cgroup_memory; + /// TaskService is a Task template struct, it is considered a helper struct, /// which has already implemented `Task` trait, so that users can make it the type `T` /// parameter of `Service`, and implements their own `ContainerFactory` and `Container`. @@ -95,6 +112,44 @@ impl TaskService { } } +#[cfg(target_os = "linux")] +fn run_oom_monitor(mut rx: Receiver, id: String, tx: EventSender) { + let oom_event = TaskOOM { + container_id: id, + ..Default::default() + }; + let topic = oom_event.topic(); + let oom_box = Box::new(oom_event); + spawn(async move { + while let Some(_item) = rx.recv().await { + tx.send((topic.to_string(), oom_box.clone())) + .await + .unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e)); + } + }); +} + +#[cfg(target_os = "linux")] +async fn monitor_oom(id: &String, pid: u32, tx: EventSender) -> Result<()> { + if !is_cgroup2_unified_mode() { + let path_from_cgorup = cgroup_memory::get_path_from_cgorup(pid).await?; + let (mount_root, mount_point) = + cgroup_memory::get_existing_cgroup_mem_path(path_from_cgorup).await?; + + let mem_cgroup_path = mount_point + &mount_root; + let rx = cgroup_memory::register_memory_event( + id, + Path::new(&mem_cgroup_path), + "memory.oom_control", + ) + .await + .map_err(other_error!(e, "register_memory_event failed:"))?; + + run_oom_monitor(rx, id.to_string(), tx); + } + Ok(()) +} + #[async_trait] impl Task for TaskService where @@ -164,6 +219,10 @@ where ..Default::default() }) .await; + #[cfg(target_os = "linux")] + if let Err(e) = monitor_oom(&req.id, resp.pid, self.tx.clone()).await { + error!("monitor_oom failed: {:?}.", e); + } } else { self.send_event(TaskExecStarted { container_id: req.id.to_string(),